Skip to content

Commit

Permalink
Implement multiplexer proxy. (#2141)
Browse files Browse the repository at this point in the history
* Implement multiplexer cache.
  • Loading branch information
zyjhtangtang authored Jan 2, 2025
1 parent ea999d4 commit e83f8ab
Show file tree
Hide file tree
Showing 41 changed files with 3,313 additions and 63 deletions.
35 changes: 35 additions & 0 deletions cmd/yurthub/app/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/wait"
apiserver "k8s.io/apiserver/pkg/server"
"k8s.io/apiserver/pkg/server/dynamiccertificates"
Expand All @@ -54,11 +55,26 @@ import (
"github.com/openyurtio/openyurt/pkg/yurthub/filter/manager"
"github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/meta"
"github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/serializer"
"github.com/openyurtio/openyurt/pkg/yurthub/multiplexer"
"github.com/openyurtio/openyurt/pkg/yurthub/multiplexer/storage"
"github.com/openyurtio/openyurt/pkg/yurthub/network"
"github.com/openyurtio/openyurt/pkg/yurthub/storage/disk"
"github.com/openyurtio/openyurt/pkg/yurthub/util"
)

var AllowedMultiplexerResources = []schema.GroupVersionResource{
{
Group: "",
Version: "v1",
Resource: "services",
},
{
Group: "discovery.k8s.io",
Version: "v1",
Resource: "endpointslices",
},
}

// YurtHubConfiguration represents configuration of yurthub
type YurtHubConfiguration struct {
LBMode string
Expand Down Expand Up @@ -101,6 +117,9 @@ type YurtHubConfiguration struct {
CoordinatorClient kubernetes.Interface
LeaderElection componentbaseconfig.LeaderElectionConfiguration
HostControlPlaneAddr string // ip:port
PostStartHooks map[string]func() error
RequestMultiplexerManager multiplexer.MultiplexerManager
MultiplexerResources []schema.GroupVersionResource
}

// Complete converts *options.YurtHubOptions to *YurtHubConfiguration
Expand Down Expand Up @@ -176,6 +195,8 @@ func Complete(options *options.YurtHubOptions) (*YurtHubConfiguration, error) {
CoordinatorStorageAddr: options.CoordinatorStorageAddr,
LeaderElection: options.LeaderElection,
HostControlPlaneAddr: options.HostControlPlaneAddr,
MultiplexerResources: AllowedMultiplexerResources,
RequestMultiplexerManager: newMultiplexerCacheManager(options),
}

// if yurthub is in local mode, certMgr and networkMgr are no need to start
Expand Down Expand Up @@ -403,3 +424,17 @@ func prepareServerServing(options *options.YurtHubOptions, certMgr certificate.Y

return nil
}

func newMultiplexerCacheManager(options *options.YurtHubOptions) multiplexer.MultiplexerManager {
config := newRestConfig(options.NodeName, options.YurtHubProxyHost, options.YurtHubProxyPort)
rsm := storage.NewStorageManager(config)

return multiplexer.NewRequestsMultiplexerManager(rsm)
}

func newRestConfig(nodeName string, host string, port int) *rest.Config {
return &rest.Config{
Host: fmt.Sprintf("http://%s:%d", host, port),
UserAgent: util.MultiplexerProxyClientUserAgentPrefix + nodeName,
}
}
2 changes: 1 addition & 1 deletion cmd/yurthub/app/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func Run(ctx context.Context, cfg *config.YurtHubConfiguration) error {
var cacheMgr cachemanager.CacheManager
if cfg.WorkingMode == util.WorkingModeEdge {
klog.Infof("%d. new cache manager with storage wrapper and serializer manager", trace)
cacheMgr = cachemanager.NewCacheManager(cfg.StorageWrapper, cfg.SerializerManager, cfg.RESTMapperManager, cfg.SharedFactory)
cacheMgr = cachemanager.NewCacheManager(cfg.NodeName, cfg.StorageWrapper, cfg.SerializerManager, cfg.RESTMapperManager, cfg.SharedFactory)
} else {
klog.Infof("%d. disable cache manager for node %s because it is a cloud node", trace, cfg.NodeName)
}
Expand Down
Binary file added cmd/yurthub/yurthub
Binary file not shown.
14 changes: 8 additions & 6 deletions pkg/yurthub/cachemanager/cache_agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,16 @@ const (

type CacheAgent struct {
sync.Mutex
agents sets.Set[string]
store StorageWrapper
agents sets.Set[string]
store StorageWrapper
nodeName string
}

func NewCacheAgents(informerFactory informers.SharedInformerFactory, store StorageWrapper) *CacheAgent {
func NewCacheAgents(nodeName string, informerFactory informers.SharedInformerFactory, store StorageWrapper) *CacheAgent {
ca := &CacheAgent{
agents: sets.New(util.DefaultCacheAgents...),
store: store,
agents: sets.New(util.DefaultCacheAgents...).Insert(util.MultiplexerProxyClientUserAgentPrefix + nodeName),
store: store,
nodeName: nodeName,
}
configmapInformer := informerFactory.Core().V1().ConfigMaps().Informer()
configmapInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
Expand Down Expand Up @@ -108,7 +110,7 @@ func (ca *CacheAgent) deleteConfigmap(obj interface{}) {

// updateCacheAgents update cache agents
func (ca *CacheAgent) updateCacheAgents(cacheAgents, action string) sets.Set[string] {
newAgents := sets.New(util.DefaultCacheAgents...)
newAgents := sets.New(util.DefaultCacheAgents...).Insert(util.MultiplexerProxyClientUserAgentPrefix + ca.nodeName)
for _, agent := range strings.Split(cacheAgents, sepForAgent) {
agent = strings.TrimSpace(agent)
if len(agent) != 0 {
Expand Down
13 changes: 7 additions & 6 deletions pkg/yurthub/cachemanager/cache_agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,38 +36,39 @@ func TestUpdateCacheAgents(t *testing.T) {
"two new agents updated": {
initAgents: []string{},
cacheAgents: "agent1,agent2",
resultAgents: sets.New(append([]string{"agent1", "agent2"}, util.DefaultCacheAgents...)...),
resultAgents: sets.New(append([]string{"agent1", "agent2"}, util.DefaultCacheAgents...)...).Insert("multiplexer-proxy-iz2ze21g5pq9jbesubrksvz"),
deletedAgents: sets.Set[string]{},
},
"two new agents updated but an old agent deleted": {
initAgents: []string{"agent1", "agent2"},
cacheAgents: "agent2,agent3",
resultAgents: sets.New(append([]string{"agent2", "agent3"}, util.DefaultCacheAgents...)...),
resultAgents: sets.New(append([]string{"agent2", "agent3"}, util.DefaultCacheAgents...)...).Insert("multiplexer-proxy-iz2ze21g5pq9jbesubrksvz"),
deletedAgents: sets.New("agent1"),
},
"no agents updated ": {
initAgents: []string{"agent1", "agent2"},
cacheAgents: "agent1,agent2",
resultAgents: sets.New(append([]string{"agent1", "agent2"}, util.DefaultCacheAgents...)...),
resultAgents: sets.New(append([]string{"agent1", "agent2"}, util.DefaultCacheAgents...)...).Insert("multiplexer-proxy-iz2ze21g5pq9jbesubrksvz"),
deletedAgents: sets.New[string](),
},
"no agents updated with default": {
initAgents: []string{"agent1", "agent2", "kubelet"},
cacheAgents: "agent1,agent2",
resultAgents: sets.New(append([]string{"agent1", "agent2"}, util.DefaultCacheAgents...)...),
resultAgents: sets.New(append([]string{"agent1", "agent2"}, util.DefaultCacheAgents...)...).Insert("multiplexer-proxy-iz2ze21g5pq9jbesubrksvz"),
deletedAgents: sets.New[string](),
},
"empty agents added ": {
initAgents: []string{},
cacheAgents: "",
resultAgents: sets.New(util.DefaultCacheAgents...),
resultAgents: sets.New(util.DefaultCacheAgents...).Insert("multiplexer-proxy-iz2ze21g5pq9jbesubrksvz"),
deletedAgents: sets.New[string](),
},
}
for k, tt := range testcases {
t.Run(k, func(t *testing.T) {
m := &CacheAgent{
agents: sets.New(tt.initAgents...),
agents: sets.New(tt.initAgents...),
nodeName: "iz2ze21g5pq9jbesubrksvz",
}

m.updateCacheAgents(strings.Join(tt.initAgents, ","), "")
Expand Down
3 changes: 2 additions & 1 deletion pkg/yurthub/cachemanager/cache_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,12 +84,13 @@ type cacheManager struct {

// NewCacheManager creates a new CacheManager
func NewCacheManager(
nodeName string,
storagewrapper StorageWrapper,
serializerMgr *serializer.SerializerManager,
restMapperMgr *hubmeta.RESTMapperManager,
sharedFactory informers.SharedInformerFactory,
) CacheManager {
cacheAgents := NewCacheAgents(sharedFactory, storagewrapper)
cacheAgents := NewCacheAgents(nodeName, sharedFactory, storagewrapper)
cm := &cacheManager{
storage: storagewrapper,
serializerManager: serializerMgr,
Expand Down
12 changes: 6 additions & 6 deletions pkg/yurthub/cachemanager/cache_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func TestCacheGetResponse(t *testing.T) {
}
sWrapper := NewStorageWrapper(dStorage)
serializerM := serializer.NewSerializerManager()
yurtCM := NewCacheManager(sWrapper, serializerM, restRESTMapperMgr, fakeSharedInformerFactory)
yurtCM := NewCacheManager("node1", sWrapper, serializerM, restRESTMapperMgr, fakeSharedInformerFactory)

testcases := map[string]struct {
inputObj runtime.Object
Expand Down Expand Up @@ -607,7 +607,7 @@ func TestCacheWatchResponse(t *testing.T) {
}
sWrapper := NewStorageWrapper(dStorage)
serializerM := serializer.NewSerializerManager()
yurtCM := NewCacheManager(sWrapper, serializerM, restRESTMapperMgr, fakeSharedInformerFactory)
yurtCM := NewCacheManager("node1", sWrapper, serializerM, restRESTMapperMgr, fakeSharedInformerFactory)

testcases := map[string]struct {
inputObj []watch.Event
Expand Down Expand Up @@ -1014,7 +1014,7 @@ func TestCacheListResponse(t *testing.T) {
if err != nil {
t.Errorf("failed to create RESTMapper manager, %v", err)
}
yurtCM := NewCacheManager(sWrapper, serializerM, restRESTMapperMgr, fakeSharedInformerFactory)
yurtCM := NewCacheManager("node1", sWrapper, serializerM, restRESTMapperMgr, fakeSharedInformerFactory)

testcases := map[string]struct {
inputObj runtime.Object
Expand Down Expand Up @@ -1607,7 +1607,7 @@ func TestQueryCacheForGet(t *testing.T) {
if err != nil {
t.Errorf("failed to create RESTMapper manager, %v", err)
}
yurtCM := NewCacheManager(sWrapper, serializerM, restRESTMapperMgr, fakeSharedInformerFactory)
yurtCM := NewCacheManager("node1", sWrapper, serializerM, restRESTMapperMgr, fakeSharedInformerFactory)

testcases := map[string]struct {
keyBuildInfo storage.KeyBuildInfo
Expand Down Expand Up @@ -2334,7 +2334,7 @@ func TestQueryCacheForList(t *testing.T) {
if err != nil {
t.Errorf("failed to create RESTMapper manager, %v", err)
}
yurtCM := NewCacheManager(sWrapper, serializerM, restRESTMapperMgr, fakeSharedInformerFactory)
yurtCM := NewCacheManager("node1", sWrapper, serializerM, restRESTMapperMgr, fakeSharedInformerFactory)

testcases := map[string]struct {
keyBuildInfo *storage.KeyBuildInfo
Expand Down Expand Up @@ -3158,7 +3158,7 @@ func TestCanCacheFor(t *testing.T) {
defer close(stop)
client := fake.NewSimpleClientset()
informerFactory := informers.NewSharedInformerFactory(client, 0)
m := NewCacheManager(s, nil, nil, informerFactory)
m := NewCacheManager("node1", s, nil, nil, informerFactory)
informerFactory.Start(nil)
cache.WaitForCacheSync(stop, informerFactory.Core().V1().ConfigMaps().Informer().HasSynced)
if tt.preRequest != nil {
Expand Down
53 changes: 53 additions & 0 deletions pkg/yurthub/filter/filterchain/filterchain.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
Copyright 2024 The OpenYurt Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package filterchain

import (
"strings"

"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/sets"

yurtutil "github.com/openyurtio/openyurt/pkg/util"
"github.com/openyurtio/openyurt/pkg/yurthub/filter"
)

type FilterChain []filter.ObjectFilter

func (chain FilterChain) Name() string {
var names []string
for i := range chain {
names = append(names, chain[i].Name())
}
return strings.Join(names, ",")
}

func (chain FilterChain) SupportedResourceAndVerbs() map[string]sets.Set[string] {
// do nothing
return map[string]sets.Set[string]{}
}

func (chain FilterChain) Filter(obj runtime.Object, stopCh <-chan struct{}) runtime.Object {
for i := range chain {
obj = chain[i].Filter(obj, stopCh)
if yurtutil.IsNil(obj) {
break
}
}

return obj
}
5 changes: 5 additions & 0 deletions pkg/yurthub/filter/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,4 +59,9 @@ type ObjectFilter interface {
Filter(obj runtime.Object, stopCh <-chan struct{}) runtime.Object
}

type FilterFinder interface {
FindResponseFilter(req *http.Request) (ResponseFilter, bool)
FindObjectFilters(req *http.Request) ObjectFilter
}

type NodeGetter func(name string) (*v1.Node, error)
18 changes: 18 additions & 0 deletions pkg/yurthub/filter/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/openyurtio/openyurt/pkg/yurthub/filter"
"github.com/openyurtio/openyurt/pkg/yurthub/filter/approver"
"github.com/openyurtio/openyurt/pkg/yurthub/filter/base"
"github.com/openyurtio/openyurt/pkg/yurthub/filter/filterchain"
"github.com/openyurtio/openyurt/pkg/yurthub/filter/initializer"
"github.com/openyurtio/openyurt/pkg/yurthub/filter/responsefilter"
"github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/serializer"
Expand Down Expand Up @@ -111,3 +112,20 @@ func (m *Manager) FindResponseFilter(req *http.Request) (filter.ResponseFilter,

return nil, false
}

func (m *Manager) FindObjectFilters(req *http.Request) filter.ObjectFilter {
objectFilters := make([]filter.ObjectFilter, 0)
approved, filterNames := m.Approver.Approve(req)
if !approved {
return nil
}

for i := range filterNames {
if objectFilter, ok := m.nameToObjectFilter[filterNames[i]]; ok {
objectFilters = append(objectFilters, objectFilter)
}
}

filters := filterchain.FilterChain(objectFilters)
return filters
}
16 changes: 8 additions & 8 deletions pkg/yurthub/filter/servicetopology/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
"context"

v1 "k8s.io/api/core/v1"
discovery "k8s.io/api/discovery/v1"
discoveryv1 "k8s.io/api/discovery/v1"
discoveryV1beta1 "k8s.io/api/discovery/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
Expand Down Expand Up @@ -130,7 +130,7 @@ func (stf *serviceTopologyFilter) Filter(obj runtime.Object, stopCh <-chan struc
}

switch v := obj.(type) {
case *v1.Endpoints, *discoveryV1beta1.EndpointSlice, *discovery.EndpointSlice:
case *v1.Endpoints, *discoveryV1beta1.EndpointSlice, *discoveryv1.EndpointSlice:
return stf.serviceTopologyHandler(v)
default:
return obj
Expand Down Expand Up @@ -164,9 +164,9 @@ func (stf *serviceTopologyFilter) resolveServiceTopologyType(obj runtime.Object)
case *discoveryV1beta1.EndpointSlice:
svcNamespace = v.Namespace
svcName = v.Labels[discoveryV1beta1.LabelServiceName]
case *discovery.EndpointSlice:
case *discoveryv1.EndpointSlice:
svcNamespace = v.Namespace
svcName = v.Labels[discovery.LabelServiceName]
svcName = v.Labels[discoveryv1.LabelServiceName]
case *v1.Endpoints:
svcNamespace = v.Namespace
svcName = v.Name
Expand All @@ -190,7 +190,7 @@ func (stf *serviceTopologyFilter) nodeTopologyHandler(obj runtime.Object) runtim
switch v := obj.(type) {
case *discoveryV1beta1.EndpointSlice:
return reassembleV1beta1EndpointSlice(v, stf.nodeName, nil)
case *discovery.EndpointSlice:
case *discoveryv1.EndpointSlice:
return reassembleEndpointSlice(v, stf.nodeName, nil)
case *v1.Endpoints:
return reassembleEndpoints(v, stf.nodeName, nil)
Expand All @@ -215,7 +215,7 @@ func (stf *serviceTopologyFilter) nodePoolTopologyHandler(obj runtime.Object) ru
switch v := obj.(type) {
case *discoveryV1beta1.EndpointSlice:
return reassembleV1beta1EndpointSlice(v, "", nodes)
case *discovery.EndpointSlice:
case *discoveryv1.EndpointSlice:
return reassembleEndpointSlice(v, "", nodes)
case *v1.Endpoints:
return reassembleEndpoints(v, "", nodes)
Expand Down Expand Up @@ -252,13 +252,13 @@ func reassembleV1beta1EndpointSlice(endpointSlice *discoveryV1beta1.EndpointSlic
}

// reassembleEndpointSlice will discard endpoints that are not on the same node/nodePool for v1.EndpointSlice
func reassembleEndpointSlice(endpointSlice *discovery.EndpointSlice, nodeName string, nodes []string) *discovery.EndpointSlice {
func reassembleEndpointSlice(endpointSlice *discoveryv1.EndpointSlice, nodeName string, nodes []string) *discoveryv1.EndpointSlice {
if len(nodeName) != 0 && len(nodes) != 0 {
klog.Warningf("reassembleEndpointSlice: nodeName(%s) and nodePool can not be set at the same time", nodeName)
return endpointSlice
}

var newEps []discovery.Endpoint
var newEps []discoveryv1.Endpoint
for i := range endpointSlice.Endpoints {
if len(nodeName) != 0 {
if *endpointSlice.Endpoints[i].NodeName == nodeName {
Expand Down
Loading

0 comments on commit e83f8ab

Please sign in to comment.