Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement multiplexer proxy. #2141

Merged
merged 2 commits into from
Jan 2, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions cmd/yurthub/app/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/wait"
apiserver "k8s.io/apiserver/pkg/server"
"k8s.io/apiserver/pkg/server/dynamiccertificates"
Expand All @@ -54,11 +55,26 @@ import (
"github.com/openyurtio/openyurt/pkg/yurthub/filter/manager"
"github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/meta"
"github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/serializer"
"github.com/openyurtio/openyurt/pkg/yurthub/multiplexer"
"github.com/openyurtio/openyurt/pkg/yurthub/multiplexer/storage"
"github.com/openyurtio/openyurt/pkg/yurthub/network"
"github.com/openyurtio/openyurt/pkg/yurthub/storage/disk"
"github.com/openyurtio/openyurt/pkg/yurthub/util"
)

var AllowedMultiplexerResources = []schema.GroupVersionResource{
{
Group: "",
Version: "v1",
Resource: "services",
},
{
Group: "discovery.k8s.io",
Version: "v1",
Resource: "endpointslices",
},
}

// YurtHubConfiguration represents configuration of yurthub
type YurtHubConfiguration struct {
LBMode string
Expand Down Expand Up @@ -101,6 +117,9 @@ type YurtHubConfiguration struct {
CoordinatorClient kubernetes.Interface
LeaderElection componentbaseconfig.LeaderElectionConfiguration
HostControlPlaneAddr string // ip:port
PostStartHooks map[string]func() error
RequestMultiplexerManager multiplexer.MultiplexerManager
MultiplexerResources []schema.GroupVersionResource
}

// Complete converts *options.YurtHubOptions to *YurtHubConfiguration
Expand Down Expand Up @@ -176,6 +195,8 @@ func Complete(options *options.YurtHubOptions) (*YurtHubConfiguration, error) {
CoordinatorStorageAddr: options.CoordinatorStorageAddr,
LeaderElection: options.LeaderElection,
HostControlPlaneAddr: options.HostControlPlaneAddr,
MultiplexerResources: AllowedMultiplexerResources,
RequestMultiplexerManager: newMultiplexerCacheManager(options),
}

// if yurthub is in local mode, certMgr and networkMgr are no need to start
Expand Down Expand Up @@ -403,3 +424,17 @@ func prepareServerServing(options *options.YurtHubOptions, certMgr certificate.Y

return nil
}

func newMultiplexerCacheManager(options *options.YurtHubOptions) multiplexer.MultiplexerManager {
config := newRestConfig(options.NodeName, options.YurtHubProxyHost, options.YurtHubProxyPort)
rsm := storage.NewStorageManager(config)

return multiplexer.NewRequestsMultiplexerManager(rsm)
}

func newRestConfig(nodeName string, host string, port int) *rest.Config {
return &rest.Config{
Host: fmt.Sprintf("http://%s:%d", host, port),
UserAgent: util.MultiplexerProxyClientUserAgentPrefix + nodeName,
}
}
2 changes: 1 addition & 1 deletion cmd/yurthub/app/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@
var cacheMgr cachemanager.CacheManager
if cfg.WorkingMode == util.WorkingModeEdge {
klog.Infof("%d. new cache manager with storage wrapper and serializer manager", trace)
cacheMgr = cachemanager.NewCacheManager(cfg.StorageWrapper, cfg.SerializerManager, cfg.RESTMapperManager, cfg.SharedFactory)
cacheMgr = cachemanager.NewCacheManager(cfg.NodeName, cfg.StorageWrapper, cfg.SerializerManager, cfg.RESTMapperManager, cfg.SharedFactory)

Check warning on line 139 in cmd/yurthub/app/start.go

View check run for this annotation

Codecov / codecov/patch

cmd/yurthub/app/start.go#L139

Added line #L139 was not covered by tests
} else {
klog.Infof("%d. disable cache manager for node %s because it is a cloud node", trace, cfg.NodeName)
}
Expand Down
Binary file added cmd/yurthub/yurthub
Binary file not shown.
14 changes: 8 additions & 6 deletions pkg/yurthub/cachemanager/cache_agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,16 @@ const (

type CacheAgent struct {
sync.Mutex
agents sets.Set[string]
store StorageWrapper
agents sets.Set[string]
store StorageWrapper
nodeName string
}

func NewCacheAgents(informerFactory informers.SharedInformerFactory, store StorageWrapper) *CacheAgent {
func NewCacheAgents(nodeName string, informerFactory informers.SharedInformerFactory, store StorageWrapper) *CacheAgent {
ca := &CacheAgent{
agents: sets.New(util.DefaultCacheAgents...),
store: store,
agents: sets.New(util.DefaultCacheAgents...).Insert(util.MultiplexerProxyClientUserAgentPrefix + nodeName),
store: store,
nodeName: nodeName,
}
configmapInformer := informerFactory.Core().V1().ConfigMaps().Informer()
configmapInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
Expand Down Expand Up @@ -108,7 +110,7 @@ func (ca *CacheAgent) deleteConfigmap(obj interface{}) {

// updateCacheAgents update cache agents
func (ca *CacheAgent) updateCacheAgents(cacheAgents, action string) sets.Set[string] {
newAgents := sets.New(util.DefaultCacheAgents...)
newAgents := sets.New(util.DefaultCacheAgents...).Insert(util.MultiplexerProxyClientUserAgentPrefix + ca.nodeName)
for _, agent := range strings.Split(cacheAgents, sepForAgent) {
agent = strings.TrimSpace(agent)
if len(agent) != 0 {
Expand Down
13 changes: 7 additions & 6 deletions pkg/yurthub/cachemanager/cache_agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,38 +36,39 @@ func TestUpdateCacheAgents(t *testing.T) {
"two new agents updated": {
initAgents: []string{},
cacheAgents: "agent1,agent2",
resultAgents: sets.New(append([]string{"agent1", "agent2"}, util.DefaultCacheAgents...)...),
resultAgents: sets.New(append([]string{"agent1", "agent2"}, util.DefaultCacheAgents...)...).Insert("multiplexer-proxy-iz2ze21g5pq9jbesubrksvz"),
deletedAgents: sets.Set[string]{},
},
"two new agents updated but an old agent deleted": {
initAgents: []string{"agent1", "agent2"},
cacheAgents: "agent2,agent3",
resultAgents: sets.New(append([]string{"agent2", "agent3"}, util.DefaultCacheAgents...)...),
resultAgents: sets.New(append([]string{"agent2", "agent3"}, util.DefaultCacheAgents...)...).Insert("multiplexer-proxy-iz2ze21g5pq9jbesubrksvz"),
deletedAgents: sets.New("agent1"),
},
"no agents updated ": {
initAgents: []string{"agent1", "agent2"},
cacheAgents: "agent1,agent2",
resultAgents: sets.New(append([]string{"agent1", "agent2"}, util.DefaultCacheAgents...)...),
resultAgents: sets.New(append([]string{"agent1", "agent2"}, util.DefaultCacheAgents...)...).Insert("multiplexer-proxy-iz2ze21g5pq9jbesubrksvz"),
deletedAgents: sets.New[string](),
},
"no agents updated with default": {
initAgents: []string{"agent1", "agent2", "kubelet"},
cacheAgents: "agent1,agent2",
resultAgents: sets.New(append([]string{"agent1", "agent2"}, util.DefaultCacheAgents...)...),
resultAgents: sets.New(append([]string{"agent1", "agent2"}, util.DefaultCacheAgents...)...).Insert("multiplexer-proxy-iz2ze21g5pq9jbesubrksvz"),
deletedAgents: sets.New[string](),
},
"empty agents added ": {
initAgents: []string{},
cacheAgents: "",
resultAgents: sets.New(util.DefaultCacheAgents...),
resultAgents: sets.New(util.DefaultCacheAgents...).Insert("multiplexer-proxy-iz2ze21g5pq9jbesubrksvz"),
deletedAgents: sets.New[string](),
},
}
for k, tt := range testcases {
t.Run(k, func(t *testing.T) {
m := &CacheAgent{
agents: sets.New(tt.initAgents...),
agents: sets.New(tt.initAgents...),
nodeName: "iz2ze21g5pq9jbesubrksvz",
}

m.updateCacheAgents(strings.Join(tt.initAgents, ","), "")
Expand Down
3 changes: 2 additions & 1 deletion pkg/yurthub/cachemanager/cache_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,12 +84,13 @@ type cacheManager struct {

// NewCacheManager creates a new CacheManager
func NewCacheManager(
nodeName string,
storagewrapper StorageWrapper,
serializerMgr *serializer.SerializerManager,
restMapperMgr *hubmeta.RESTMapperManager,
sharedFactory informers.SharedInformerFactory,
) CacheManager {
cacheAgents := NewCacheAgents(sharedFactory, storagewrapper)
cacheAgents := NewCacheAgents(nodeName, sharedFactory, storagewrapper)
cm := &cacheManager{
storage: storagewrapper,
serializerManager: serializerMgr,
Expand Down
12 changes: 6 additions & 6 deletions pkg/yurthub/cachemanager/cache_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func TestCacheGetResponse(t *testing.T) {
}
sWrapper := NewStorageWrapper(dStorage)
serializerM := serializer.NewSerializerManager()
yurtCM := NewCacheManager(sWrapper, serializerM, restRESTMapperMgr, fakeSharedInformerFactory)
yurtCM := NewCacheManager("node1", sWrapper, serializerM, restRESTMapperMgr, fakeSharedInformerFactory)

testcases := map[string]struct {
inputObj runtime.Object
Expand Down Expand Up @@ -607,7 +607,7 @@ func TestCacheWatchResponse(t *testing.T) {
}
sWrapper := NewStorageWrapper(dStorage)
serializerM := serializer.NewSerializerManager()
yurtCM := NewCacheManager(sWrapper, serializerM, restRESTMapperMgr, fakeSharedInformerFactory)
yurtCM := NewCacheManager("node1", sWrapper, serializerM, restRESTMapperMgr, fakeSharedInformerFactory)

testcases := map[string]struct {
inputObj []watch.Event
Expand Down Expand Up @@ -1014,7 +1014,7 @@ func TestCacheListResponse(t *testing.T) {
if err != nil {
t.Errorf("failed to create RESTMapper manager, %v", err)
}
yurtCM := NewCacheManager(sWrapper, serializerM, restRESTMapperMgr, fakeSharedInformerFactory)
yurtCM := NewCacheManager("node1", sWrapper, serializerM, restRESTMapperMgr, fakeSharedInformerFactory)

testcases := map[string]struct {
inputObj runtime.Object
Expand Down Expand Up @@ -1607,7 +1607,7 @@ func TestQueryCacheForGet(t *testing.T) {
if err != nil {
t.Errorf("failed to create RESTMapper manager, %v", err)
}
yurtCM := NewCacheManager(sWrapper, serializerM, restRESTMapperMgr, fakeSharedInformerFactory)
yurtCM := NewCacheManager("node1", sWrapper, serializerM, restRESTMapperMgr, fakeSharedInformerFactory)

testcases := map[string]struct {
keyBuildInfo storage.KeyBuildInfo
Expand Down Expand Up @@ -2334,7 +2334,7 @@ func TestQueryCacheForList(t *testing.T) {
if err != nil {
t.Errorf("failed to create RESTMapper manager, %v", err)
}
yurtCM := NewCacheManager(sWrapper, serializerM, restRESTMapperMgr, fakeSharedInformerFactory)
yurtCM := NewCacheManager("node1", sWrapper, serializerM, restRESTMapperMgr, fakeSharedInformerFactory)

testcases := map[string]struct {
keyBuildInfo *storage.KeyBuildInfo
Expand Down Expand Up @@ -3158,7 +3158,7 @@ func TestCanCacheFor(t *testing.T) {
defer close(stop)
client := fake.NewSimpleClientset()
informerFactory := informers.NewSharedInformerFactory(client, 0)
m := NewCacheManager(s, nil, nil, informerFactory)
m := NewCacheManager("node1", s, nil, nil, informerFactory)
informerFactory.Start(nil)
cache.WaitForCacheSync(stop, informerFactory.Core().V1().ConfigMaps().Informer().HasSynced)
if tt.preRequest != nil {
Expand Down
53 changes: 53 additions & 0 deletions pkg/yurthub/filter/filterchain/filterchain.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
Copyright 2024 The OpenYurt Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package filterchain
rambohe-ch marked this conversation as resolved.
Show resolved Hide resolved

import (
"strings"

"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/sets"

yurtutil "github.com/openyurtio/openyurt/pkg/util"
"github.com/openyurtio/openyurt/pkg/yurthub/filter"
)

type FilterChain []filter.ObjectFilter

func (chain FilterChain) Name() string {
var names []string
for i := range chain {
names = append(names, chain[i].Name())

Check warning on line 34 in pkg/yurthub/filter/filterchain/filterchain.go

View check run for this annotation

Codecov / codecov/patch

pkg/yurthub/filter/filterchain/filterchain.go#L31-L34

Added lines #L31 - L34 were not covered by tests
}
return strings.Join(names, ",")

Check warning on line 36 in pkg/yurthub/filter/filterchain/filterchain.go

View check run for this annotation

Codecov / codecov/patch

pkg/yurthub/filter/filterchain/filterchain.go#L36

Added line #L36 was not covered by tests
}

func (chain FilterChain) SupportedResourceAndVerbs() map[string]sets.Set[string] {

Check warning on line 39 in pkg/yurthub/filter/filterchain/filterchain.go

View check run for this annotation

Codecov / codecov/patch

pkg/yurthub/filter/filterchain/filterchain.go#L39

Added line #L39 was not covered by tests
// do nothing
return map[string]sets.Set[string]{}

Check warning on line 41 in pkg/yurthub/filter/filterchain/filterchain.go

View check run for this annotation

Codecov / codecov/patch

pkg/yurthub/filter/filterchain/filterchain.go#L41

Added line #L41 was not covered by tests
}

func (chain FilterChain) Filter(obj runtime.Object, stopCh <-chan struct{}) runtime.Object {
for i := range chain {
obj = chain[i].Filter(obj, stopCh)
if yurtutil.IsNil(obj) {
break

Check warning on line 48 in pkg/yurthub/filter/filterchain/filterchain.go

View check run for this annotation

Codecov / codecov/patch

pkg/yurthub/filter/filterchain/filterchain.go#L44-L48

Added lines #L44 - L48 were not covered by tests
}
}

return obj

Check warning on line 52 in pkg/yurthub/filter/filterchain/filterchain.go

View check run for this annotation

Codecov / codecov/patch

pkg/yurthub/filter/filterchain/filterchain.go#L52

Added line #L52 was not covered by tests
}
5 changes: 5 additions & 0 deletions pkg/yurthub/filter/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,4 +59,9 @@ type ObjectFilter interface {
Filter(obj runtime.Object, stopCh <-chan struct{}) runtime.Object
}

type FilterFinder interface {
FindResponseFilter(req *http.Request) (ResponseFilter, bool)
FindObjectFilters(req *http.Request) ObjectFilter
}

type NodeGetter func(name string) (*v1.Node, error)
18 changes: 18 additions & 0 deletions pkg/yurthub/filter/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
"github.com/openyurtio/openyurt/pkg/yurthub/filter"
"github.com/openyurtio/openyurt/pkg/yurthub/filter/approver"
"github.com/openyurtio/openyurt/pkg/yurthub/filter/base"
"github.com/openyurtio/openyurt/pkg/yurthub/filter/filterchain"
"github.com/openyurtio/openyurt/pkg/yurthub/filter/initializer"
"github.com/openyurtio/openyurt/pkg/yurthub/filter/responsefilter"
"github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/serializer"
Expand Down Expand Up @@ -111,3 +112,20 @@

return nil, false
}

func (m *Manager) FindObjectFilters(req *http.Request) filter.ObjectFilter {
objectFilters := make([]filter.ObjectFilter, 0)
approved, filterNames := m.Approver.Approve(req)
if !approved {
return nil

Check warning on line 120 in pkg/yurthub/filter/manager/manager.go

View check run for this annotation

Codecov / codecov/patch

pkg/yurthub/filter/manager/manager.go#L116-L120

Added lines #L116 - L120 were not covered by tests
}

for i := range filterNames {
if objectFilter, ok := m.nameToObjectFilter[filterNames[i]]; ok {
objectFilters = append(objectFilters, objectFilter)

Check warning on line 125 in pkg/yurthub/filter/manager/manager.go

View check run for this annotation

Codecov / codecov/patch

pkg/yurthub/filter/manager/manager.go#L123-L125

Added lines #L123 - L125 were not covered by tests
}
}

filters := filterchain.FilterChain(objectFilters)
return filters

Check warning on line 130 in pkg/yurthub/filter/manager/manager.go

View check run for this annotation

Codecov / codecov/patch

pkg/yurthub/filter/manager/manager.go#L129-L130

Added lines #L129 - L130 were not covered by tests
}
16 changes: 8 additions & 8 deletions pkg/yurthub/filter/servicetopology/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
"context"

v1 "k8s.io/api/core/v1"
discovery "k8s.io/api/discovery/v1"
discoveryv1 "k8s.io/api/discovery/v1"
discoveryV1beta1 "k8s.io/api/discovery/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
Expand Down Expand Up @@ -130,7 +130,7 @@ func (stf *serviceTopologyFilter) Filter(obj runtime.Object, stopCh <-chan struc
}

switch v := obj.(type) {
case *v1.Endpoints, *discoveryV1beta1.EndpointSlice, *discovery.EndpointSlice:
case *v1.Endpoints, *discoveryV1beta1.EndpointSlice, *discoveryv1.EndpointSlice:
return stf.serviceTopologyHandler(v)
default:
return obj
Expand Down Expand Up @@ -164,9 +164,9 @@ func (stf *serviceTopologyFilter) resolveServiceTopologyType(obj runtime.Object)
case *discoveryV1beta1.EndpointSlice:
svcNamespace = v.Namespace
svcName = v.Labels[discoveryV1beta1.LabelServiceName]
case *discovery.EndpointSlice:
case *discoveryv1.EndpointSlice:
svcNamespace = v.Namespace
svcName = v.Labels[discovery.LabelServiceName]
svcName = v.Labels[discoveryv1.LabelServiceName]
case *v1.Endpoints:
svcNamespace = v.Namespace
svcName = v.Name
Expand All @@ -190,7 +190,7 @@ func (stf *serviceTopologyFilter) nodeTopologyHandler(obj runtime.Object) runtim
switch v := obj.(type) {
case *discoveryV1beta1.EndpointSlice:
return reassembleV1beta1EndpointSlice(v, stf.nodeName, nil)
case *discovery.EndpointSlice:
case *discoveryv1.EndpointSlice:
return reassembleEndpointSlice(v, stf.nodeName, nil)
case *v1.Endpoints:
return reassembleEndpoints(v, stf.nodeName, nil)
Expand All @@ -215,7 +215,7 @@ func (stf *serviceTopologyFilter) nodePoolTopologyHandler(obj runtime.Object) ru
switch v := obj.(type) {
case *discoveryV1beta1.EndpointSlice:
return reassembleV1beta1EndpointSlice(v, "", nodes)
case *discovery.EndpointSlice:
case *discoveryv1.EndpointSlice:
return reassembleEndpointSlice(v, "", nodes)
case *v1.Endpoints:
return reassembleEndpoints(v, "", nodes)
Expand Down Expand Up @@ -252,13 +252,13 @@ func reassembleV1beta1EndpointSlice(endpointSlice *discoveryV1beta1.EndpointSlic
}

// reassembleEndpointSlice will discard endpoints that are not on the same node/nodePool for v1.EndpointSlice
func reassembleEndpointSlice(endpointSlice *discovery.EndpointSlice, nodeName string, nodes []string) *discovery.EndpointSlice {
func reassembleEndpointSlice(endpointSlice *discoveryv1.EndpointSlice, nodeName string, nodes []string) *discoveryv1.EndpointSlice {
if len(nodeName) != 0 && len(nodes) != 0 {
klog.Warningf("reassembleEndpointSlice: nodeName(%s) and nodePool can not be set at the same time", nodeName)
return endpointSlice
}

var newEps []discovery.Endpoint
var newEps []discoveryv1.Endpoint
for i := range endpointSlice.Endpoints {
if len(nodeName) != 0 {
if *endpointSlice.Endpoints[i].NodeName == nodeName {
Expand Down
Loading
Loading