Skip to content

Commit

Permalink
add multiplexer proxy.
Browse files Browse the repository at this point in the history
  • Loading branch information
zyjhtangtang committed Nov 14, 2024
1 parent 7ba1591 commit d6c4a47
Show file tree
Hide file tree
Showing 24 changed files with 2,020 additions and 85 deletions.
35 changes: 35 additions & 0 deletions cmd/yurthub/app/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/wait"
apiserver "k8s.io/apiserver/pkg/server"
"k8s.io/apiserver/pkg/server/dynamiccertificates"
Expand All @@ -54,11 +55,26 @@ import (
"github.com/openyurtio/openyurt/pkg/yurthub/filter/manager"
"github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/meta"
"github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/serializer"
"github.com/openyurtio/openyurt/pkg/yurthub/multiplexer"
"github.com/openyurtio/openyurt/pkg/yurthub/multiplexer/storage"
"github.com/openyurtio/openyurt/pkg/yurthub/network"
"github.com/openyurtio/openyurt/pkg/yurthub/storage/disk"
"github.com/openyurtio/openyurt/pkg/yurthub/util"
)

var DefaultMultiplexerResources = []schema.GroupVersionResource{
{
Group: "",
Version: "v1",
Resource: "services",
},
{
Group: "discovery.k8s.io",
Version: "v1",
Resource: "endpointslices",
},
}

// YurtHubConfiguration represents configuration of yurthub
type YurtHubConfiguration struct {
LBMode string
Expand Down Expand Up @@ -101,6 +117,9 @@ type YurtHubConfiguration struct {
CoordinatorClient kubernetes.Interface
LeaderElection componentbaseconfig.LeaderElectionConfiguration
HostControlPlaneAddr string // ip:port
PostStartHooks map[string]func() error
MultiplexerCacheManager multiplexer.MultiplexerManager
MultiplexerResources []schema.GroupVersionResource
}

// Complete converts *options.YurtHubOptions to *YurtHubConfiguration
Expand Down Expand Up @@ -176,6 +195,8 @@ func Complete(options *options.YurtHubOptions) (*YurtHubConfiguration, error) {
CoordinatorStorageAddr: options.CoordinatorStorageAddr,
LeaderElection: options.LeaderElection,
HostControlPlaneAddr: options.HostControlPlaneAddr,
MultiplexerResources: DefaultMultiplexerResources,
MultiplexerCacheManager: newMultiplexerCacheManager(options),
}

// if yurthub is in local mode, certMgr and networkMgr are no need to start
Expand Down Expand Up @@ -403,3 +424,17 @@ func prepareServerServing(options *options.YurtHubOptions, certMgr certificate.Y

return nil
}

func newMultiplexerCacheManager(options *options.YurtHubOptions) multiplexer.MultiplexerManager {
config := newRestConfig(options.YurtHubProxyHost, options.YurtHubProxyPort)
rsm := storage.NewStorageManager(config)

return multiplexer.NewRequestsMultiplexerManager(rsm)
}

func newRestConfig(host string, port int) *rest.Config {
return &rest.Config{
Host: fmt.Sprintf("http://%s:%d", host, port),
UserAgent: util.MultiplexerProxyClientUserAgent,
}
}
34 changes: 34 additions & 0 deletions pkg/yurthub/filter/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,13 @@ package filter
import (
"io"
"net/http"
"strings"

v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/sets"

yurtutil "github.com/openyurtio/openyurt/pkg/util"
)

type NodesInPoolGetter func(poolName string) ([]string, error)
Expand Down Expand Up @@ -59,4 +62,35 @@ type ObjectFilter interface {
Filter(obj runtime.Object, stopCh <-chan struct{}) runtime.Object
}

type FilterManager interface {
FindResponseFilter(req *http.Request) (ResponseFilter, bool)
FindObjectFilters(req *http.Request) ObjectFilter
}

type NodeGetter func(name string) (*v1.Node, error)

type UnionObjectFilter []ObjectFilter

func (chain UnionObjectFilter) Name() string {
var names []string
for i := range chain {
names = append(names, chain[i].Name())

Check warning on line 77 in pkg/yurthub/filter/interfaces.go

View check run for this annotation

Codecov / codecov/patch

pkg/yurthub/filter/interfaces.go#L74-L77

Added lines #L74 - L77 were not covered by tests
}
return strings.Join(names, ",")

Check warning on line 79 in pkg/yurthub/filter/interfaces.go

View check run for this annotation

Codecov / codecov/patch

pkg/yurthub/filter/interfaces.go#L79

Added line #L79 was not covered by tests
}

func (chain UnionObjectFilter) SupportedResourceAndVerbs() map[string]sets.Set[string] {

Check warning on line 82 in pkg/yurthub/filter/interfaces.go

View check run for this annotation

Codecov / codecov/patch

pkg/yurthub/filter/interfaces.go#L82

Added line #L82 was not covered by tests
// do nothing
return map[string]sets.Set[string]{}

Check warning on line 84 in pkg/yurthub/filter/interfaces.go

View check run for this annotation

Codecov / codecov/patch

pkg/yurthub/filter/interfaces.go#L84

Added line #L84 was not covered by tests
}

func (chain UnionObjectFilter) Filter(obj runtime.Object, stopCh <-chan struct{}) runtime.Object {
for i := range chain {
obj = chain[i].Filter(obj, stopCh)
if yurtutil.IsNil(obj) {
break

Check warning on line 91 in pkg/yurthub/filter/interfaces.go

View check run for this annotation

Codecov / codecov/patch

pkg/yurthub/filter/interfaces.go#L87-L91

Added lines #L87 - L91 were not covered by tests
}
}

return obj

Check warning on line 95 in pkg/yurthub/filter/interfaces.go

View check run for this annotation

Codecov / codecov/patch

pkg/yurthub/filter/interfaces.go#L95

Added line #L95 was not covered by tests
}
17 changes: 17 additions & 0 deletions pkg/yurthub/filter/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,3 +111,20 @@ func (m *Manager) FindResponseFilter(req *http.Request) (filter.ResponseFilter,

return nil, false
}

func (m *Manager) FindObjectFilters(req *http.Request) filter.ObjectFilter {
objectFilters := make([]filter.ObjectFilter, 0)
approved, filterNames := m.Approver.Approve(req)
if !approved {
return nil

Check warning on line 119 in pkg/yurthub/filter/manager/manager.go

View check run for this annotation

Codecov / codecov/patch

pkg/yurthub/filter/manager/manager.go#L115-L119

Added lines #L115 - L119 were not covered by tests
}

for i := range filterNames {
if objectFilter, ok := m.nameToObjectFilter[filterNames[i]]; ok {
objectFilters = append(objectFilters, objectFilter)

Check warning on line 124 in pkg/yurthub/filter/manager/manager.go

View check run for this annotation

Codecov / codecov/patch

pkg/yurthub/filter/manager/manager.go#L122-L124

Added lines #L122 - L124 were not covered by tests
}
}

filters := filter.UnionObjectFilter(objectFilters)
return filters

Check warning on line 129 in pkg/yurthub/filter/manager/manager.go

View check run for this annotation

Codecov / codecov/patch

pkg/yurthub/filter/manager/manager.go#L128-L129

Added lines #L128 - L129 were not covered by tests
}
1 change: 0 additions & 1 deletion pkg/yurthub/multiplexer/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import (

type Interface interface {
Watch(ctx context.Context, key string, opts kstorage.ListOptions) (watch.Interface, error)
Get(ctx context.Context, key string, opts kstorage.GetOptions, objPtr runtime.Object) error
GetList(ctx context.Context, key string, opts kstorage.ListOptions, listObj runtime.Object) error
}

Expand Down
96 changes: 50 additions & 46 deletions pkg/yurthub/multiplexer/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,19 +48,60 @@ var newServiceListFunc = func() runtime.Object {
}

func TestResourceCache_GetList(t *testing.T) {
cache, _, err := NewResourceCache(
ystorage.NewFakeServiceStorage([]v1.Service{*newService(metav1.NamespaceSystem, "coredns")}),
storage := ystorage.NewFakeServiceStorage(
[]v1.Service{
*newService(metav1.NamespaceSystem, "coredns"),
*newService(metav1.NamespaceDefault, "nginx"),
})

cache, _, _ := NewResourceCache(
storage,
serviceGVR,
&ResourceCacheConfig{
keyFunc,
KeyFunc,
newServiceFunc,
newServiceListFunc,
attrsFunc,
AttrsFunc,
},
)

assert.Nil(t, err)
assertCacheGetList(t, cache)
for _, tc := range []struct {
name string
key string
expectedServiceList *v1.ServiceList
}{
{
"all namespace",
"",
&v1.ServiceList{
ListMeta: metav1.ListMeta{
ResourceVersion: "100",
},
Items: []v1.Service{
*newService(metav1.NamespaceDefault, "nginx"),
*newService(metav1.NamespaceSystem, "coredns"),
},
},
},
{
"default namespace",
"/default",
&v1.ServiceList{
ListMeta: metav1.ListMeta{
ResourceVersion: "100",
},
Items: []v1.Service{
*newService(metav1.NamespaceDefault, "nginx"),
},
},
},
} {
serviceList := &v1.ServiceList{}
err := cache.GetList(context.Background(), tc.key, mockListOptions(), serviceList)

assert.Nil(t, err)
assert.Equal(t, tc.expectedServiceList.Items, serviceList.Items)
}
}

func mockListOptions() storage.ListOptions {
Expand All @@ -74,27 +115,17 @@ func mockListOptions() storage.ListOptions {
}
}

func assertCacheGetList(t testing.TB, cache Interface) {
t.Helper()

serviceList := &v1.ServiceList{}
err := cache.GetList(context.Background(), "", mockListOptions(), serviceList)

assert.Nil(t, err)
assert.Equal(t, 1, len(serviceList.Items))
}

func TestResourceCache_Watch(t *testing.T) {
fakeStorage := ystorage.NewFakeServiceStorage([]v1.Service{*newService(metav1.NamespaceSystem, "coredns")})

cache, _, err := NewResourceCache(
fakeStorage,
serviceGVR,
&ResourceCacheConfig{
keyFunc,
KeyFunc,
newServiceFunc,
newServiceListFunc,
attrsFunc,
AttrsFunc,
},
)

Expand All @@ -117,7 +148,7 @@ func mockWatchOptions() storage.ListOptions {
}

func assertCacheWatch(t testing.TB, cache Interface, fs *ystorage.FakeServiceStorage) {
receive, err := cache.Watch(context.TODO(), "", mockWatchOptions())
receive, err := cache.Watch(context.TODO(), "/kube-system", mockWatchOptions())

go func() {
fs.AddWatchObject(newService(metav1.NamespaceSystem, "coredns2"))
Expand All @@ -127,30 +158,3 @@ func assertCacheWatch(t testing.TB, cache Interface, fs *ystorage.FakeServiceSto
event := <-receive.ResultChan()
assert.Equal(t, watch.Added, event.Type)
}

func TestResourceCache_Get(t *testing.T) {
cache, _, err := NewResourceCache(
ystorage.NewFakeServiceStorage([]v1.Service{*newService(metav1.NamespaceSystem, "coredns")}),
serviceGVR,
&ResourceCacheConfig{
keyFunc,
newServiceFunc,
newServiceListFunc,
attrsFunc,
},
)
assert.Nil(t, err)
assertCacheGet(t, cache)
}

func assertCacheGet(t testing.TB, cache Interface) {
t.Helper()

service := &v1.Service{}
err := cache.Get(context.Background(), "/kube-system/coredns", storage.GetOptions{
ResourceVersion: "1",
}, service)

assert.Nil(t, err)
assert.Equal(t, "coredns", service.Name)
}
39 changes: 39 additions & 0 deletions pkg/yurthub/multiplexer/fake_multiplexer_manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
Copyright 2024 The OpenYurt Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package multiplexer

import "k8s.io/apimachinery/pkg/runtime/schema"

type FakeCacheManager struct {
cacheMap map[string]Interface
resourceConfigMap map[string]*ResourceCacheConfig
}

func NewFakeCacheManager(cacheMap map[string]Interface, resourceConfigMap map[string]*ResourceCacheConfig) *FakeCacheManager {
return &FakeCacheManager{
cacheMap: cacheMap,
resourceConfigMap: resourceConfigMap,

Check warning on line 29 in pkg/yurthub/multiplexer/fake_multiplexer_manager.go

View check run for this annotation

Codecov / codecov/patch

pkg/yurthub/multiplexer/fake_multiplexer_manager.go#L26-L29

Added lines #L26 - L29 were not covered by tests
}
}

func (fcm *FakeCacheManager) ResourceCacheConfig(gvr *schema.GroupVersionResource) (*ResourceCacheConfig, error) {
return fcm.resourceConfigMap[gvr.String()], nil

Check warning on line 34 in pkg/yurthub/multiplexer/fake_multiplexer_manager.go

View check run for this annotation

Codecov / codecov/patch

pkg/yurthub/multiplexer/fake_multiplexer_manager.go#L33-L34

Added lines #L33 - L34 were not covered by tests
}

func (fcm *FakeCacheManager) ResourceCache(gvr *schema.GroupVersionResource) (Interface, func(), error) {
return fcm.cacheMap[gvr.String()], nil, nil

Check warning on line 38 in pkg/yurthub/multiplexer/fake_multiplexer_manager.go

View check run for this annotation

Codecov / codecov/patch

pkg/yurthub/multiplexer/fake_multiplexer_manager.go#L37-L38

Added lines #L37 - L38 were not covered by tests
}
10 changes: 4 additions & 6 deletions pkg/yurthub/multiplexer/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (
ystorage "github.com/openyurtio/openyurt/pkg/yurthub/multiplexer/storage"
)

var keyFunc = func(obj runtime.Object) (string, error) {
var KeyFunc = func(obj runtime.Object) (string, error) {
accessor, err := meta.Accessor(obj)
if err != nil {
return "", err

Check warning on line 36 in pkg/yurthub/multiplexer/manager.go

View check run for this annotation

Codecov / codecov/patch

pkg/yurthub/multiplexer/manager.go#L36

Added line #L36 was not covered by tests
Expand All @@ -48,7 +48,7 @@ var keyFunc = func(obj runtime.Object) (string, error) {
return "/" + ns + "/" + name, nil
}

var attrsFunc = func(obj runtime.Object) (labels.Set, fields.Set, error) {
var AttrsFunc = func(obj runtime.Object) (labels.Set, fields.Set, error) {
metadata, err := meta.Accessor(obj)
if err != nil {
return nil, nil, err

Check warning on line 54 in pkg/yurthub/multiplexer/manager.go

View check run for this annotation

Codecov / codecov/patch

pkg/yurthub/multiplexer/manager.go#L54

Added line #L54 was not covered by tests
Expand Down Expand Up @@ -93,7 +93,6 @@ func NewRequestsMultiplexerManager(
cacheDestroyFuncMap: make(map[string]func()),
}
}

func (m *multiplexerManager) ResourceCacheConfig(gvr *schema.GroupVersionResource) (*ResourceCacheConfig, error) {
if config, ok := m.cacheConfigMap[gvr.String()]; ok {
return config, nil

Check warning on line 98 in pkg/yurthub/multiplexer/manager.go

View check run for this annotation

Codecov / codecov/patch

pkg/yurthub/multiplexer/manager.go#L98

Added line #L98 was not covered by tests
Expand Down Expand Up @@ -127,7 +126,6 @@ func (m *multiplexerManager) convertToGVK(gvr *schema.GroupVersionResource) (sch

func (m *multiplexerManager) newResourceCacheConfig(gvk schema.GroupVersionKind,
listGVK schema.GroupVersionKind) *ResourceCacheConfig {

resourceCacheConfig := &ResourceCacheConfig{
NewFunc: func() runtime.Object {
obj, _ := scheme.Scheme.New(gvk)
Expand All @@ -137,8 +135,8 @@ func (m *multiplexerManager) newResourceCacheConfig(gvk schema.GroupVersionKind,
objList, _ := scheme.Scheme.New(listGVK)
return objList
},
KeyFunc: keyFunc,
GetAttrsFunc: attrsFunc,
KeyFunc: KeyFunc,
GetAttrsFunc: AttrsFunc,
}

return resourceCacheConfig
Expand Down
Loading

0 comments on commit d6c4a47

Please sign in to comment.