diff --git a/cmd/yurthub/app/config/config.go b/cmd/yurthub/app/config/config.go index 1969445ba22..8c605bfc63d 100644 --- a/cmd/yurthub/app/config/config.go +++ b/cmd/yurthub/app/config/config.go @@ -29,6 +29,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/util/wait" apiserver "k8s.io/apiserver/pkg/server" "k8s.io/apiserver/pkg/server/dynamiccertificates" @@ -54,11 +55,26 @@ import ( "github.com/openyurtio/openyurt/pkg/yurthub/filter/manager" "github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/meta" "github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/serializer" + "github.com/openyurtio/openyurt/pkg/yurthub/multiplexer" + "github.com/openyurtio/openyurt/pkg/yurthub/multiplexer/storage" "github.com/openyurtio/openyurt/pkg/yurthub/network" "github.com/openyurtio/openyurt/pkg/yurthub/storage/disk" "github.com/openyurtio/openyurt/pkg/yurthub/util" ) +var AllowedMultiplexerResources = []schema.GroupVersionResource{ + { + Group: "", + Version: "v1", + Resource: "services", + }, + { + Group: "discovery.k8s.io", + Version: "v1", + Resource: "endpointslices", + }, +} + // YurtHubConfiguration represents configuration of yurthub type YurtHubConfiguration struct { LBMode string @@ -101,6 +117,9 @@ type YurtHubConfiguration struct { CoordinatorClient kubernetes.Interface LeaderElection componentbaseconfig.LeaderElectionConfiguration HostControlPlaneAddr string // ip:port + PostStartHooks map[string]func() error + RequestMultiplexerManager multiplexer.MultiplexerManager + MultiplexerResources []schema.GroupVersionResource } // Complete converts *options.YurtHubOptions to *YurtHubConfiguration @@ -176,6 +195,8 @@ func Complete(options *options.YurtHubOptions) (*YurtHubConfiguration, error) { CoordinatorStorageAddr: options.CoordinatorStorageAddr, LeaderElection: options.LeaderElection, HostControlPlaneAddr: options.HostControlPlaneAddr, + MultiplexerResources: AllowedMultiplexerResources, + RequestMultiplexerManager: newMultiplexerCacheManager(options), } // if yurthub is in local mode, certMgr and networkMgr are no need to start @@ -403,3 +424,17 @@ func prepareServerServing(options *options.YurtHubOptions, certMgr certificate.Y return nil } + +func newMultiplexerCacheManager(options *options.YurtHubOptions) multiplexer.MultiplexerManager { + config := newRestConfig(options.NodeName, options.YurtHubProxyHost, options.YurtHubProxyPort) + rsm := storage.NewStorageManager(config) + + return multiplexer.NewRequestsMultiplexerManager(rsm) +} + +func newRestConfig(nodeName string, host string, port int) *rest.Config { + return &rest.Config{ + Host: fmt.Sprintf("http://%s:%d", host, port), + UserAgent: util.MultiplexerProxyClientUserAgentPrefix + nodeName, + } +} diff --git a/cmd/yurthub/app/start.go b/cmd/yurthub/app/start.go index 792419fb28e..21969d3855c 100644 --- a/cmd/yurthub/app/start.go +++ b/cmd/yurthub/app/start.go @@ -136,7 +136,7 @@ func Run(ctx context.Context, cfg *config.YurtHubConfiguration) error { var cacheMgr cachemanager.CacheManager if cfg.WorkingMode == util.WorkingModeEdge { klog.Infof("%d. new cache manager with storage wrapper and serializer manager", trace) - cacheMgr = cachemanager.NewCacheManager(cfg.StorageWrapper, cfg.SerializerManager, cfg.RESTMapperManager, cfg.SharedFactory) + cacheMgr = cachemanager.NewCacheManager(cfg.NodeName, cfg.StorageWrapper, cfg.SerializerManager, cfg.RESTMapperManager, cfg.SharedFactory) } else { klog.Infof("%d. disable cache manager for node %s because it is a cloud node", trace, cfg.NodeName) } diff --git a/cmd/yurthub/yurthub b/cmd/yurthub/yurthub new file mode 100755 index 00000000000..3633deb3c69 Binary files /dev/null and b/cmd/yurthub/yurthub differ diff --git a/pkg/yurthub/cachemanager/cache_agent.go b/pkg/yurthub/cachemanager/cache_agent.go index 7c7abe25656..ba17b8cfd16 100644 --- a/pkg/yurthub/cachemanager/cache_agent.go +++ b/pkg/yurthub/cachemanager/cache_agent.go @@ -35,14 +35,16 @@ const ( type CacheAgent struct { sync.Mutex - agents sets.Set[string] - store StorageWrapper + agents sets.Set[string] + store StorageWrapper + nodeName string } -func NewCacheAgents(informerFactory informers.SharedInformerFactory, store StorageWrapper) *CacheAgent { +func NewCacheAgents(nodeName string, informerFactory informers.SharedInformerFactory, store StorageWrapper) *CacheAgent { ca := &CacheAgent{ - agents: sets.New(util.DefaultCacheAgents...), - store: store, + agents: sets.New(util.DefaultCacheAgents...).Insert(util.MultiplexerProxyClientUserAgentPrefix + nodeName), + store: store, + nodeName: nodeName, } configmapInformer := informerFactory.Core().V1().ConfigMaps().Informer() configmapInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ @@ -108,7 +110,7 @@ func (ca *CacheAgent) deleteConfigmap(obj interface{}) { // updateCacheAgents update cache agents func (ca *CacheAgent) updateCacheAgents(cacheAgents, action string) sets.Set[string] { - newAgents := sets.New(util.DefaultCacheAgents...) + newAgents := sets.New(util.DefaultCacheAgents...).Insert(util.MultiplexerProxyClientUserAgentPrefix + ca.nodeName) for _, agent := range strings.Split(cacheAgents, sepForAgent) { agent = strings.TrimSpace(agent) if len(agent) != 0 { diff --git a/pkg/yurthub/cachemanager/cache_agent_test.go b/pkg/yurthub/cachemanager/cache_agent_test.go index 29016c2fe58..bf38d001e28 100644 --- a/pkg/yurthub/cachemanager/cache_agent_test.go +++ b/pkg/yurthub/cachemanager/cache_agent_test.go @@ -36,38 +36,39 @@ func TestUpdateCacheAgents(t *testing.T) { "two new agents updated": { initAgents: []string{}, cacheAgents: "agent1,agent2", - resultAgents: sets.New(append([]string{"agent1", "agent2"}, util.DefaultCacheAgents...)...), + resultAgents: sets.New(append([]string{"agent1", "agent2"}, util.DefaultCacheAgents...)...).Insert("multiplexer-proxy-iz2ze21g5pq9jbesubrksvz"), deletedAgents: sets.Set[string]{}, }, "two new agents updated but an old agent deleted": { initAgents: []string{"agent1", "agent2"}, cacheAgents: "agent2,agent3", - resultAgents: sets.New(append([]string{"agent2", "agent3"}, util.DefaultCacheAgents...)...), + resultAgents: sets.New(append([]string{"agent2", "agent3"}, util.DefaultCacheAgents...)...).Insert("multiplexer-proxy-iz2ze21g5pq9jbesubrksvz"), deletedAgents: sets.New("agent1"), }, "no agents updated ": { initAgents: []string{"agent1", "agent2"}, cacheAgents: "agent1,agent2", - resultAgents: sets.New(append([]string{"agent1", "agent2"}, util.DefaultCacheAgents...)...), + resultAgents: sets.New(append([]string{"agent1", "agent2"}, util.DefaultCacheAgents...)...).Insert("multiplexer-proxy-iz2ze21g5pq9jbesubrksvz"), deletedAgents: sets.New[string](), }, "no agents updated with default": { initAgents: []string{"agent1", "agent2", "kubelet"}, cacheAgents: "agent1,agent2", - resultAgents: sets.New(append([]string{"agent1", "agent2"}, util.DefaultCacheAgents...)...), + resultAgents: sets.New(append([]string{"agent1", "agent2"}, util.DefaultCacheAgents...)...).Insert("multiplexer-proxy-iz2ze21g5pq9jbesubrksvz"), deletedAgents: sets.New[string](), }, "empty agents added ": { initAgents: []string{}, cacheAgents: "", - resultAgents: sets.New(util.DefaultCacheAgents...), + resultAgents: sets.New(util.DefaultCacheAgents...).Insert("multiplexer-proxy-iz2ze21g5pq9jbesubrksvz"), deletedAgents: sets.New[string](), }, } for k, tt := range testcases { t.Run(k, func(t *testing.T) { m := &CacheAgent{ - agents: sets.New(tt.initAgents...), + agents: sets.New(tt.initAgents...), + nodeName: "iz2ze21g5pq9jbesubrksvz", } m.updateCacheAgents(strings.Join(tt.initAgents, ","), "") diff --git a/pkg/yurthub/cachemanager/cache_manager.go b/pkg/yurthub/cachemanager/cache_manager.go index 0c91b13e262..9c976059de1 100644 --- a/pkg/yurthub/cachemanager/cache_manager.go +++ b/pkg/yurthub/cachemanager/cache_manager.go @@ -84,12 +84,13 @@ type cacheManager struct { // NewCacheManager creates a new CacheManager func NewCacheManager( + nodeName string, storagewrapper StorageWrapper, serializerMgr *serializer.SerializerManager, restMapperMgr *hubmeta.RESTMapperManager, sharedFactory informers.SharedInformerFactory, ) CacheManager { - cacheAgents := NewCacheAgents(sharedFactory, storagewrapper) + cacheAgents := NewCacheAgents(nodeName, sharedFactory, storagewrapper) cm := &cacheManager{ storage: storagewrapper, serializerManager: serializerMgr, diff --git a/pkg/yurthub/cachemanager/cache_manager_test.go b/pkg/yurthub/cachemanager/cache_manager_test.go index 5287fe74255..3f06abcd079 100644 --- a/pkg/yurthub/cachemanager/cache_manager_test.go +++ b/pkg/yurthub/cachemanager/cache_manager_test.go @@ -70,7 +70,7 @@ func TestCacheGetResponse(t *testing.T) { } sWrapper := NewStorageWrapper(dStorage) serializerM := serializer.NewSerializerManager() - yurtCM := NewCacheManager(sWrapper, serializerM, restRESTMapperMgr, fakeSharedInformerFactory) + yurtCM := NewCacheManager("node1", sWrapper, serializerM, restRESTMapperMgr, fakeSharedInformerFactory) testcases := map[string]struct { inputObj runtime.Object @@ -607,7 +607,7 @@ func TestCacheWatchResponse(t *testing.T) { } sWrapper := NewStorageWrapper(dStorage) serializerM := serializer.NewSerializerManager() - yurtCM := NewCacheManager(sWrapper, serializerM, restRESTMapperMgr, fakeSharedInformerFactory) + yurtCM := NewCacheManager("node1", sWrapper, serializerM, restRESTMapperMgr, fakeSharedInformerFactory) testcases := map[string]struct { inputObj []watch.Event @@ -1014,7 +1014,7 @@ func TestCacheListResponse(t *testing.T) { if err != nil { t.Errorf("failed to create RESTMapper manager, %v", err) } - yurtCM := NewCacheManager(sWrapper, serializerM, restRESTMapperMgr, fakeSharedInformerFactory) + yurtCM := NewCacheManager("node1", sWrapper, serializerM, restRESTMapperMgr, fakeSharedInformerFactory) testcases := map[string]struct { inputObj runtime.Object @@ -1607,7 +1607,7 @@ func TestQueryCacheForGet(t *testing.T) { if err != nil { t.Errorf("failed to create RESTMapper manager, %v", err) } - yurtCM := NewCacheManager(sWrapper, serializerM, restRESTMapperMgr, fakeSharedInformerFactory) + yurtCM := NewCacheManager("node1", sWrapper, serializerM, restRESTMapperMgr, fakeSharedInformerFactory) testcases := map[string]struct { keyBuildInfo storage.KeyBuildInfo @@ -2334,7 +2334,7 @@ func TestQueryCacheForList(t *testing.T) { if err != nil { t.Errorf("failed to create RESTMapper manager, %v", err) } - yurtCM := NewCacheManager(sWrapper, serializerM, restRESTMapperMgr, fakeSharedInformerFactory) + yurtCM := NewCacheManager("node1", sWrapper, serializerM, restRESTMapperMgr, fakeSharedInformerFactory) testcases := map[string]struct { keyBuildInfo *storage.KeyBuildInfo @@ -3158,7 +3158,7 @@ func TestCanCacheFor(t *testing.T) { defer close(stop) client := fake.NewSimpleClientset() informerFactory := informers.NewSharedInformerFactory(client, 0) - m := NewCacheManager(s, nil, nil, informerFactory) + m := NewCacheManager("node1", s, nil, nil, informerFactory) informerFactory.Start(nil) cache.WaitForCacheSync(stop, informerFactory.Core().V1().ConfigMaps().Informer().HasSynced) if tt.preRequest != nil { diff --git a/pkg/yurthub/filter/filterchain/filterchain.go b/pkg/yurthub/filter/filterchain/filterchain.go new file mode 100644 index 00000000000..2c3270af7f9 --- /dev/null +++ b/pkg/yurthub/filter/filterchain/filterchain.go @@ -0,0 +1,53 @@ +/* +Copyright 2024 The OpenYurt Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package filterchain + +import ( + "strings" + + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/sets" + + yurtutil "github.com/openyurtio/openyurt/pkg/util" + "github.com/openyurtio/openyurt/pkg/yurthub/filter" +) + +type FilterChain []filter.ObjectFilter + +func (chain FilterChain) Name() string { + var names []string + for i := range chain { + names = append(names, chain[i].Name()) + } + return strings.Join(names, ",") +} + +func (chain FilterChain) SupportedResourceAndVerbs() map[string]sets.Set[string] { + // do nothing + return map[string]sets.Set[string]{} +} + +func (chain FilterChain) Filter(obj runtime.Object, stopCh <-chan struct{}) runtime.Object { + for i := range chain { + obj = chain[i].Filter(obj, stopCh) + if yurtutil.IsNil(obj) { + break + } + } + + return obj +} diff --git a/pkg/yurthub/filter/interfaces.go b/pkg/yurthub/filter/interfaces.go index ca1ca83528c..9770e887398 100644 --- a/pkg/yurthub/filter/interfaces.go +++ b/pkg/yurthub/filter/interfaces.go @@ -59,4 +59,9 @@ type ObjectFilter interface { Filter(obj runtime.Object, stopCh <-chan struct{}) runtime.Object } +type FilterFinder interface { + FindResponseFilter(req *http.Request) (ResponseFilter, bool) + FindObjectFilters(req *http.Request) ObjectFilter +} + type NodeGetter func(name string) (*v1.Node, error) diff --git a/pkg/yurthub/filter/manager/manager.go b/pkg/yurthub/filter/manager/manager.go index c5bd712e641..10ab21eb2ea 100644 --- a/pkg/yurthub/filter/manager/manager.go +++ b/pkg/yurthub/filter/manager/manager.go @@ -29,6 +29,7 @@ import ( "github.com/openyurtio/openyurt/pkg/yurthub/filter" "github.com/openyurtio/openyurt/pkg/yurthub/filter/approver" "github.com/openyurtio/openyurt/pkg/yurthub/filter/base" + "github.com/openyurtio/openyurt/pkg/yurthub/filter/filterchain" "github.com/openyurtio/openyurt/pkg/yurthub/filter/initializer" "github.com/openyurtio/openyurt/pkg/yurthub/filter/responsefilter" "github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/serializer" @@ -111,3 +112,20 @@ func (m *Manager) FindResponseFilter(req *http.Request) (filter.ResponseFilter, return nil, false } + +func (m *Manager) FindObjectFilters(req *http.Request) filter.ObjectFilter { + objectFilters := make([]filter.ObjectFilter, 0) + approved, filterNames := m.Approver.Approve(req) + if !approved { + return nil + } + + for i := range filterNames { + if objectFilter, ok := m.nameToObjectFilter[filterNames[i]]; ok { + objectFilters = append(objectFilters, objectFilter) + } + } + + filters := filterchain.FilterChain(objectFilters) + return filters +} diff --git a/pkg/yurthub/filter/servicetopology/filter.go b/pkg/yurthub/filter/servicetopology/filter.go index 04d91fb4743..ac32e9c85f6 100644 --- a/pkg/yurthub/filter/servicetopology/filter.go +++ b/pkg/yurthub/filter/servicetopology/filter.go @@ -20,7 +20,7 @@ import ( "context" v1 "k8s.io/api/core/v1" - discovery "k8s.io/api/discovery/v1" + discoveryv1 "k8s.io/api/discovery/v1" discoveryV1beta1 "k8s.io/api/discovery/v1beta1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" @@ -130,7 +130,7 @@ func (stf *serviceTopologyFilter) Filter(obj runtime.Object, stopCh <-chan struc } switch v := obj.(type) { - case *v1.Endpoints, *discoveryV1beta1.EndpointSlice, *discovery.EndpointSlice: + case *v1.Endpoints, *discoveryV1beta1.EndpointSlice, *discoveryv1.EndpointSlice: return stf.serviceTopologyHandler(v) default: return obj @@ -164,9 +164,9 @@ func (stf *serviceTopologyFilter) resolveServiceTopologyType(obj runtime.Object) case *discoveryV1beta1.EndpointSlice: svcNamespace = v.Namespace svcName = v.Labels[discoveryV1beta1.LabelServiceName] - case *discovery.EndpointSlice: + case *discoveryv1.EndpointSlice: svcNamespace = v.Namespace - svcName = v.Labels[discovery.LabelServiceName] + svcName = v.Labels[discoveryv1.LabelServiceName] case *v1.Endpoints: svcNamespace = v.Namespace svcName = v.Name @@ -190,7 +190,7 @@ func (stf *serviceTopologyFilter) nodeTopologyHandler(obj runtime.Object) runtim switch v := obj.(type) { case *discoveryV1beta1.EndpointSlice: return reassembleV1beta1EndpointSlice(v, stf.nodeName, nil) - case *discovery.EndpointSlice: + case *discoveryv1.EndpointSlice: return reassembleEndpointSlice(v, stf.nodeName, nil) case *v1.Endpoints: return reassembleEndpoints(v, stf.nodeName, nil) @@ -215,7 +215,7 @@ func (stf *serviceTopologyFilter) nodePoolTopologyHandler(obj runtime.Object) ru switch v := obj.(type) { case *discoveryV1beta1.EndpointSlice: return reassembleV1beta1EndpointSlice(v, "", nodes) - case *discovery.EndpointSlice: + case *discoveryv1.EndpointSlice: return reassembleEndpointSlice(v, "", nodes) case *v1.Endpoints: return reassembleEndpoints(v, "", nodes) @@ -252,13 +252,13 @@ func reassembleV1beta1EndpointSlice(endpointSlice *discoveryV1beta1.EndpointSlic } // reassembleEndpointSlice will discard endpoints that are not on the same node/nodePool for v1.EndpointSlice -func reassembleEndpointSlice(endpointSlice *discovery.EndpointSlice, nodeName string, nodes []string) *discovery.EndpointSlice { +func reassembleEndpointSlice(endpointSlice *discoveryv1.EndpointSlice, nodeName string, nodes []string) *discoveryv1.EndpointSlice { if len(nodeName) != 0 && len(nodes) != 0 { klog.Warningf("reassembleEndpointSlice: nodeName(%s) and nodePool can not be set at the same time", nodeName) return endpointSlice } - var newEps []discovery.Endpoint + var newEps []discoveryv1.Endpoint for i := range endpointSlice.Endpoints { if len(nodeName) != 0 { if *endpointSlice.Endpoints[i].NodeName == nodeName { diff --git a/pkg/yurthub/multiplexer/cache.go b/pkg/yurthub/multiplexer/cache.go new file mode 100644 index 00000000000..44155427488 --- /dev/null +++ b/pkg/yurthub/multiplexer/cache.go @@ -0,0 +1,73 @@ +/* +Copyright 2024 The OpenYurt Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package multiplexer + +import ( + "context" + "fmt" + "sync" + + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/watch" + kstorage "k8s.io/apiserver/pkg/storage" + "k8s.io/apiserver/pkg/storage/cacher" + "k8s.io/client-go/kubernetes/scheme" +) + +type Interface interface { + Watch(ctx context.Context, key string, opts kstorage.ListOptions) (watch.Interface, error) + GetList(ctx context.Context, key string, opts kstorage.ListOptions, listObj runtime.Object) error +} + +type ResourceCacheConfig struct { + KeyFunc func(runtime.Object) (string, error) + NewFunc func() runtime.Object + NewListFunc func() runtime.Object + GetAttrsFunc kstorage.AttrFunc +} + +func NewResourceCache( + s kstorage.Interface, + resource *schema.GroupVersionResource, + config *ResourceCacheConfig) (Interface, func(), error) { + + cacheConfig := cacher.Config{ + Storage: s, + Versioner: kstorage.APIObjectVersioner{}, + GroupResource: resource.GroupResource(), + KeyFunc: config.KeyFunc, + NewFunc: config.NewFunc, + NewListFunc: config.NewListFunc, + GetAttrsFunc: config.GetAttrsFunc, + Codec: scheme.Codecs.LegacyCodec(resource.GroupVersion()), + } + + cacher, err := cacher.NewCacherFromConfig(cacheConfig) + if err != nil { + return nil, func() {}, fmt.Errorf("failed to new cacher from config, error: %v", err) + } + + var once sync.Once + destroyFunc := func() { + once.Do(func() { + cacher.Stop() + }) + } + + return cacher, destroyFunc, nil +} diff --git a/pkg/yurthub/multiplexer/cache_test.go b/pkg/yurthub/multiplexer/cache_test.go new file mode 100644 index 00000000000..af2c4ef3d1e --- /dev/null +++ b/pkg/yurthub/multiplexer/cache_test.go @@ -0,0 +1,160 @@ +/* +Copyright 2024 The OpenYurt Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package multiplexer + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/apiserver/pkg/storage" + + ystorage "github.com/openyurtio/openyurt/pkg/yurthub/multiplexer/storage" +) + +var serviceGVR = &schema.GroupVersionResource{ + Group: "", + Version: "v1", + Resource: "services", +} + +var newServiceFunc = func() runtime.Object { + return &v1.Service{} +} + +var newServiceListFunc = func() runtime.Object { + return &v1.ServiceList{} +} + +func TestResourceCache_GetList(t *testing.T) { + storage := ystorage.NewFakeServiceStorage( + []v1.Service{ + *newService(metav1.NamespaceSystem, "coredns"), + *newService(metav1.NamespaceDefault, "nginx"), + }) + + cache, _, _ := NewResourceCache( + storage, + serviceGVR, + &ResourceCacheConfig{ + KeyFunc, + newServiceFunc, + newServiceListFunc, + AttrsFunc, + }, + ) + + for _, tc := range []struct { + name string + key string + expectedServiceList *v1.ServiceList + }{ + { + "all namespace", + "", + &v1.ServiceList{ + ListMeta: metav1.ListMeta{ + ResourceVersion: "100", + }, + Items: []v1.Service{ + *newService(metav1.NamespaceDefault, "nginx"), + *newService(metav1.NamespaceSystem, "coredns"), + }, + }, + }, + { + "default namespace", + "/default", + &v1.ServiceList{ + ListMeta: metav1.ListMeta{ + ResourceVersion: "100", + }, + Items: []v1.Service{ + *newService(metav1.NamespaceDefault, "nginx"), + }, + }, + }, + } { + serviceList := &v1.ServiceList{} + err := cache.GetList(context.Background(), tc.key, mockListOptions(), serviceList) + + assert.Nil(t, err) + assert.Equal(t, tc.expectedServiceList.Items, serviceList.Items) + } +} + +func mockListOptions() storage.ListOptions { + return storage.ListOptions{ + ResourceVersion: "100", + Recursive: true, + Predicate: storage.SelectionPredicate{ + Label: labels.Everything(), + Field: fields.Everything(), + }, + } +} + +func TestResourceCache_Watch(t *testing.T) { + fakeStorage := ystorage.NewFakeServiceStorage([]v1.Service{*newService(metav1.NamespaceSystem, "coredns")}) + + cache, _, err := NewResourceCache( + fakeStorage, + serviceGVR, + &ResourceCacheConfig{ + KeyFunc, + newServiceFunc, + newServiceListFunc, + AttrsFunc, + }, + ) + + assert.Nil(t, err) + assertCacheWatch(t, cache, fakeStorage) +} + +func mockWatchOptions() storage.ListOptions { + var sendInitialEvents = true + + return storage.ListOptions{ + ResourceVersion: "100", + Predicate: storage.SelectionPredicate{ + Label: labels.Everything(), + Field: fields.Everything(), + }, + Recursive: true, + SendInitialEvents: &sendInitialEvents, + } +} + +func assertCacheWatch(t testing.TB, cache Interface, fs *ystorage.FakeServiceStorage) { + receive, err := cache.Watch(context.TODO(), "/kube-system", mockWatchOptions()) + + go func() { + fs.AddWatchObject(newService(metav1.NamespaceSystem, "coredns2")) + }() + + assert.Nil(t, err) + event := <-receive.ResultChan() + assert.Equal(t, watch.Added, event.Type) +} diff --git a/pkg/yurthub/multiplexer/fake_multiplexer.go b/pkg/yurthub/multiplexer/fake_multiplexer.go new file mode 100644 index 00000000000..3e8240bfddd --- /dev/null +++ b/pkg/yurthub/multiplexer/fake_multiplexer.go @@ -0,0 +1,39 @@ +/* +Copyright 2024 The OpenYurt Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package multiplexer + +import "k8s.io/apimachinery/pkg/runtime/schema" + +type FakeCacheManager struct { + cacheMap map[string]Interface + resourceConfigMap map[string]*ResourceCacheConfig +} + +func NewFakeCacheManager(cacheMap map[string]Interface, resourceConfigMap map[string]*ResourceCacheConfig) *FakeCacheManager { + return &FakeCacheManager{ + cacheMap: cacheMap, + resourceConfigMap: resourceConfigMap, + } +} + +func (fcm *FakeCacheManager) ResourceCacheConfig(gvr *schema.GroupVersionResource) (*ResourceCacheConfig, error) { + return fcm.resourceConfigMap[gvr.String()], nil +} + +func (fcm *FakeCacheManager) ResourceCache(gvr *schema.GroupVersionResource) (Interface, func(), error) { + return fcm.cacheMap[gvr.String()], nil, nil +} diff --git a/pkg/yurthub/multiplexer/multiplexer.go b/pkg/yurthub/multiplexer/multiplexer.go new file mode 100644 index 00000000000..c442ee0b3d0 --- /dev/null +++ b/pkg/yurthub/multiplexer/multiplexer.go @@ -0,0 +1,212 @@ +/* +Copyright 2024 The OpenYurt Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package multiplexer + +import ( + "sync" + + "github.com/pkg/errors" + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/kubernetes/scheme" + + kmeta "github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/meta" + ystorage "github.com/openyurtio/openyurt/pkg/yurthub/multiplexer/storage" +) + +var KeyFunc = func(obj runtime.Object) (string, error) { + accessor, err := meta.Accessor(obj) + if err != nil { + return "", err + } + + name := accessor.GetName() + if len(name) == 0 { + return "", apierrors.NewBadRequest("Name parameter required.") + } + + ns := accessor.GetNamespace() + if len(ns) == 0 { + return "/" + name, nil + } + return "/" + ns + "/" + name, nil +} + +var AttrsFunc = func(obj runtime.Object) (labels.Set, fields.Set, error) { + metadata, err := meta.Accessor(obj) + if err != nil { + return nil, nil, err + } + + var fieldSet fields.Set + if len(metadata.GetNamespace()) > 0 { + fieldSet = fields.Set{ + "metadata.name": metadata.GetName(), + "metadata.namespace": metadata.GetNamespace(), + } + } else { + fieldSet = fields.Set{ + "metadata.name": metadata.GetName(), + } + } + + return labels.Set(metadata.GetLabels()), fieldSet, nil +} + +type MultiplexerManager interface { + ResourceCacheConfig(gvr *schema.GroupVersionResource) (*ResourceCacheConfig, error) + ResourceCache(gvr *schema.GroupVersionResource) (Interface, func(), error) +} + +type multiplexerManager struct { + restStoreManager ystorage.StorageProvider + restMapper meta.RESTMapper + + cacheLock sync.RWMutex + gvrToCache map[string]Interface + gvrToCacheDestroyFunc map[string]func() + + cacheConfigLock sync.RWMutex + gvrToCacheConfig map[string]*ResourceCacheConfig +} + +func NewRequestsMultiplexerManager( + restStoreManager ystorage.StorageProvider) MultiplexerManager { + + return &multiplexerManager{ + restStoreManager: restStoreManager, + restMapper: kmeta.NewDefaultRESTMapperFromScheme(), + gvrToCache: make(map[string]Interface), + gvrToCacheConfig: make(map[string]*ResourceCacheConfig), + gvrToCacheDestroyFunc: make(map[string]func()), + cacheLock: sync.RWMutex{}, + cacheConfigLock: sync.RWMutex{}, + } +} +func (m *multiplexerManager) ResourceCacheConfig(gvr *schema.GroupVersionResource) (*ResourceCacheConfig, error) { + if config, ok := m.tryGetResourceCacheConfig(gvr); ok { + return config, nil + } + + gvk, listGVK, err := m.convertToGVK(gvr) + if err != nil { + return nil, errors.Wrapf(err, "failed to convert to gvk from gvr %s", gvr.String()) + } + + config := m.newResourceCacheConfig(gvk, listGVK) + + m.saveResourceCacheConfig(gvr, config) + return config, nil +} + +func (m *multiplexerManager) tryGetResourceCacheConfig(gvr *schema.GroupVersionResource) (*ResourceCacheConfig, bool) { + m.cacheConfigLock.RLock() + defer m.cacheConfigLock.RUnlock() + + if config, ok := m.gvrToCacheConfig[gvr.String()]; ok { + return config, true + } + + return nil, false +} + +func (m *multiplexerManager) convertToGVK(gvr *schema.GroupVersionResource) (schema.GroupVersionKind, schema.GroupVersionKind, error) { + gvk, err := m.restMapper.KindFor(*gvr) + if err != nil { + return schema.GroupVersionKind{}, schema.GroupVersionKind{}, errors.Wrapf(err, "failed to convert gvk from gvr %s", gvr.String()) + } + + listGvk := schema.GroupVersionKind{ + Group: gvr.Group, + Version: gvr.Version, + Kind: gvk.Kind + "List", + } + + return gvk, listGvk, nil +} + +func (m *multiplexerManager) newResourceCacheConfig(gvk schema.GroupVersionKind, + listGVK schema.GroupVersionKind) *ResourceCacheConfig { + resourceCacheConfig := &ResourceCacheConfig{ + NewFunc: func() runtime.Object { + obj, _ := scheme.Scheme.New(gvk) + return obj + }, + NewListFunc: func() (object runtime.Object) { + objList, _ := scheme.Scheme.New(listGVK) + return objList + }, + KeyFunc: KeyFunc, + GetAttrsFunc: AttrsFunc, + } + + return resourceCacheConfig +} + +func (m *multiplexerManager) saveResourceCacheConfig(gvr *schema.GroupVersionResource, config *ResourceCacheConfig) { + m.cacheConfigLock.Lock() + defer m.cacheConfigLock.Unlock() + + m.gvrToCacheConfig[gvr.String()] = config +} + +func (m *multiplexerManager) ResourceCache(gvr *schema.GroupVersionResource) (Interface, func(), error) { + if sc, destroy, ok := m.tryGetResourceCache(gvr); ok { + return sc, destroy, nil + } + + restStore, err := m.restStoreManager.ResourceStorage(gvr) + if err != nil { + return nil, nil, errors.Wrapf(err, "failed to get rest store") + } + + resourceCacheConfig, err := m.ResourceCacheConfig(gvr) + if err != nil { + return nil, nil, errors.Wrapf(err, "failed to generate resource cache config") + } + + sc, destroy, err := NewResourceCache(restStore, gvr, resourceCacheConfig) + if err != nil { + return nil, nil, errors.Wrapf(err, "failed to new resource cache") + } + + m.saveResourceCache(gvr, sc, destroy) + + return sc, destroy, nil +} + +func (m *multiplexerManager) tryGetResourceCache(gvr *schema.GroupVersionResource) (Interface, func(), bool) { + m.cacheLock.RLock() + defer m.cacheLock.RUnlock() + + if sc, ok := m.gvrToCache[gvr.String()]; ok { + return sc, m.gvrToCacheDestroyFunc[gvr.String()], true + } + return nil, nil, false +} + +func (m *multiplexerManager) saveResourceCache(gvr *schema.GroupVersionResource, sc Interface, destroy func()) { + m.cacheLock.Lock() + defer m.cacheLock.Unlock() + + m.gvrToCache[gvr.String()] = sc + m.gvrToCacheDestroyFunc[gvr.String()] = destroy +} diff --git a/pkg/yurthub/multiplexer/multiplexer_test.go b/pkg/yurthub/multiplexer/multiplexer_test.go new file mode 100644 index 00000000000..6cd0865dace --- /dev/null +++ b/pkg/yurthub/multiplexer/multiplexer_test.go @@ -0,0 +1,191 @@ +/* +Copyright 2024 The OpenYurt Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package multiplexer + +import ( + "context" + "reflect" + "testing" + + "github.com/stretchr/testify/assert" + v1 "k8s.io/api/core/v1" + discovery "k8s.io/api/discovery/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + kstorage "k8s.io/apiserver/pkg/storage" + + "github.com/openyurtio/openyurt/pkg/yurthub/multiplexer/storage" +) + +func TestShareCacheManager_ResourceCacheConfig(t *testing.T) { + svcStorage := storage.NewFakeServiceStorage([]v1.Service{*newService(metav1.NamespaceSystem, "coredns")}) + storageMap := map[string]kstorage.Interface{ + serviceGVR.String(): svcStorage, + } + + sm := NewRequestsMultiplexerManager( + storage.NewDummyStorageManager(storageMap)) + + for _, tc := range []struct { + tname string + gvr *schema.GroupVersionResource + obj runtime.Object + expectedKey string + expectedObjType string + expectedObjListType string + expectedFieldSet fields.Set + namespaceScoped bool + }{ + { + "generate resource config for services", + &schema.GroupVersionResource{ + Group: "", + Version: "v1", + Resource: "services", + }, + newService(metav1.NamespaceSystem, "coredns"), + "/kube-system/coredns", + "Service", + "ServiceList", + fields.Set{ + "metadata.name": "coredns", + "metadata.namespace": "kube-system", + }, + true, + }, + { + "generate resource config for endpointslices", + &schema.GroupVersionResource{ + Group: "discovery.k8s.io", + Version: "v1", + Resource: "endpointslices", + }, + newEndpointSlice(), + "/kube-system/coredns-12345", + "EndpointSlice", + "EndpointSliceList", + fields.Set{ + "metadata.name": "coredns-12345", + "metadata.namespace": "kube-system", + }, + true, + }, + { + "generate resource config for nodes", + &schema.GroupVersionResource{ + Group: "", + Version: "v1", + Resource: "nodes", + }, + newNode(), + "/test", + "Node", + "NodeList", + fields.Set{ + "metadata.name": "test", + }, + false, + }, + } { + t.Run(tc.tname, func(t *testing.T) { + rc, err := sm.ResourceCacheConfig(tc.gvr) + + assert.Nil(t, err) + + key, _ := rc.KeyFunc(tc.obj) + assert.Equal(t, tc.expectedKey, key) + + obj := rc.NewFunc() + assert.Equal(t, tc.expectedObjType, reflect.TypeOf(obj).Elem().Name()) + + objList := rc.NewListFunc() + assert.Equal(t, tc.expectedObjListType, reflect.TypeOf(objList).Elem().Name()) + + _, fieldSet, _ := rc.GetAttrsFunc(tc.obj) + assert.Equal(t, tc.expectedFieldSet, fieldSet) + }) + } +} +func newService(namespace, name string) *v1.Service { + return &v1.Service{ + TypeMeta: metav1.TypeMeta{ + Kind: "Service", + APIVersion: "v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Namespace: namespace, + Name: name, + }, + } +} + +func newEndpointSlice() *discovery.EndpointSlice { + return &discovery.EndpointSlice{ + TypeMeta: metav1.TypeMeta{ + Kind: "EndpointSlice", + APIVersion: "discovery.k8s.io/v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Namespace: "kube-system", + Name: "coredns-12345", + }, + Endpoints: []discovery.Endpoint{ + { + Addresses: []string{"192.168.0.10"}, + }, + }, + } +} + +func newNode() *v1.Node { + return &v1.Node{ + TypeMeta: metav1.TypeMeta{ + Kind: "Node", + APIVersion: "v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + }, + } +} + +func TestShareCacheManager_ResourceCache(t *testing.T) { + svcStorage := storage.NewFakeServiceStorage( + []v1.Service{ + *newService(metav1.NamespaceSystem, "coredns"), + *newService(metav1.NamespaceDefault, "nginx"), + }) + + storageMap := map[string]kstorage.Interface{ + serviceGVR.String(): svcStorage, + } + + dsm := storage.NewDummyStorageManager(storageMap) + scm := NewRequestsMultiplexerManager(dsm) + cache, _, _ := scm.ResourceCache(serviceGVR) + + serviceList := &v1.ServiceList{} + err := cache.GetList(context.Background(), "", mockListOptions(), serviceList) + + assert.Nil(t, err) + assert.Equal(t, []v1.Service{ + *newService(metav1.NamespaceDefault, "nginx"), + *newService(metav1.NamespaceSystem, "coredns"), + }, serviceList.Items) +} diff --git a/pkg/yurthub/multiplexer/storage/api_server_storage.go b/pkg/yurthub/multiplexer/storage/api_server_storage.go new file mode 100644 index 00000000000..f705ffbb659 --- /dev/null +++ b/pkg/yurthub/multiplexer/storage/api_server_storage.go @@ -0,0 +1,103 @@ +/* +Copyright 2024 The OpenYurt Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package storage + +import ( + "context" + "math/rand" + + "github.com/pkg/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/apiserver/pkg/storage" + "k8s.io/client-go/kubernetes/scheme" + "k8s.io/client-go/rest" +) + +const minWatchRequestSeconds = 300 + +var ErrNoSupport = errors.New("Don't Support Method ") + +type apiServerStorage struct { + restClient rest.Interface + resource string +} + +func NewStorage(restClient rest.Interface, resource string) storage.Interface { + return &apiServerStorage{ + restClient: restClient, + resource: resource, + } +} + +func (rs *apiServerStorage) GetList(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error { + listOpts := &metav1.ListOptions{ + Limit: opts.Predicate.Limit, + Continue: opts.Predicate.Continue, + ResourceVersionMatch: opts.ResourceVersionMatch, + ResourceVersion: opts.ResourceVersion, + } + + return rs.restClient.Get().Resource(rs.resource).VersionedParams(listOpts, scheme.ParameterCodec).Do(ctx).Into(listObj) +} + +func (rs *apiServerStorage) Watch(ctx context.Context, key string, opts storage.ListOptions) (watch.Interface, error) { + timeoutSeconds := int64(float64(minWatchRequestSeconds) * (rand.Float64() + 1.0)) + + listOpts := &metav1.ListOptions{ + ResourceVersion: opts.ResourceVersion, + Watch: true, + TimeoutSeconds: &timeoutSeconds, + } + + w, err := rs.restClient.Get().Resource(rs.resource).VersionedParams(listOpts, scheme.ParameterCodec).Watch(ctx) + + return w, err +} + +func (rs *apiServerStorage) Versioner() storage.Versioner { + return nil +} + +func (rs *apiServerStorage) Create(ctx context.Context, key string, obj, out runtime.Object, ttl uint64) error { + return ErrNoSupport +} + +func (rs *apiServerStorage) Delete( + ctx context.Context, key string, out runtime.Object, preconditions *storage.Preconditions, + validateDeletion storage.ValidateObjectFunc, cachedExistingObject runtime.Object) error { + return ErrNoSupport +} + +func (rs *apiServerStorage) Get(ctx context.Context, key string, opts storage.GetOptions, objPtr runtime.Object) error { + return ErrNoSupport +} + +func (rs *apiServerStorage) GuaranteedUpdate( + ctx context.Context, key string, destination runtime.Object, ignoreNotFound bool, + preconditions *storage.Preconditions, tryUpdate storage.UpdateFunc, cachedExistingObject runtime.Object) error { + return ErrNoSupport +} + +func (rs *apiServerStorage) Count(key string) (int64, error) { + return 0, ErrNoSupport +} + +func (rs *apiServerStorage) RequestWatchProgress(ctx context.Context) error { + return ErrNoSupport +} diff --git a/pkg/yurthub/multiplexer/storage/api_server_storage_provider.go b/pkg/yurthub/multiplexer/storage/api_server_storage_provider.go new file mode 100644 index 00000000000..81e0b14fdbf --- /dev/null +++ b/pkg/yurthub/multiplexer/storage/api_server_storage_provider.go @@ -0,0 +1,80 @@ +/* +Copyright 2024 The OpenYurt Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package storage + +import ( + "github.com/pkg/errors" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apiserver/pkg/storage" + "k8s.io/client-go/kubernetes/scheme" + "k8s.io/client-go/rest" +) + +type StorageProvider interface { + ResourceStorage(gvr *schema.GroupVersionResource) (storage.Interface, error) +} + +type apiServerStorageProvider struct { + config *rest.Config + gvrToStorage map[string]storage.Interface +} + +func NewStorageManager(config *rest.Config) StorageProvider { + config.NegotiatedSerializer = scheme.Codecs.WithoutConversion() + return &apiServerStorageProvider{ + config: config, + gvrToStorage: make(map[string]storage.Interface), + } +} + +func (sm *apiServerStorageProvider) ResourceStorage(gvr *schema.GroupVersionResource) (storage.Interface, error) { + if rs, ok := sm.gvrToStorage[gvr.String()]; ok { + return rs, nil + } + + restClient, err := sm.restClient(gvr) + if err != nil { + return nil, errors.Wrapf(err, "failed to get rest client for %v", gvr) + } + + rs := NewStorage(restClient, gvr.Resource) + sm.gvrToStorage[gvr.String()] = rs + + return rs, nil +} + +func (sm *apiServerStorageProvider) restClient(gvr *schema.GroupVersionResource) (rest.Interface, error) { + httpClient, err := rest.HTTPClientFor(sm.config) + if err != nil { + return nil, errors.Wrapf(err, "failed to get reset http client") + } + + configShallowCopy := *sm.config + configShallowCopy.APIPath = getAPIPath(gvr) + + gv := gvr.GroupVersion() + configShallowCopy.GroupVersion = &gv + + return rest.RESTClientForConfigAndClient(&configShallowCopy, httpClient) +} + +func getAPIPath(gvr *schema.GroupVersionResource) string { + if gvr.Group == "" { + return "/api" + } + return "/apis" +} diff --git a/pkg/yurthub/multiplexer/storage/api_server_storage_provider_test.go b/pkg/yurthub/multiplexer/storage/api_server_storage_provider_test.go new file mode 100644 index 00000000000..90aaabdc187 --- /dev/null +++ b/pkg/yurthub/multiplexer/storage/api_server_storage_provider_test.go @@ -0,0 +1,78 @@ +/* +Copyright 2024 The OpenYurt Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package storage + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apiserver/pkg/storage" + "k8s.io/client-go/rest" +) + +var serviceGVR = &schema.GroupVersionResource{ + Group: "", + Version: "v1", + Resource: "services", +} + +var endpointSlicesGVR = &schema.GroupVersionResource{ + Group: "discovery.k8s.io", + Version: "v1", + Resource: "endpointslices", +} + +func TestStorageManager_ResourceStorage(t *testing.T) { + sm := NewStorageManager(&rest.Config{ + Host: "http://127.0.0.1:10261", + UserAgent: "share-hub", + }) + + for _, tc := range []struct { + tName string + gvr *schema.GroupVersionResource + Err error + }{ + { + "get resource storage for services", + serviceGVR, + nil, + }, + { + "get resource storage for endpouintslices", + endpointSlicesGVR, + nil, + }, + } { + t.Run(tc.tName, func(t *testing.T) { + restore, err := sm.ResourceStorage(tc.gvr) + + assert.Nil(t, err) + assertResourceStore(t, tc.gvr, restore) + }) + } +} + +func assertResourceStore(t testing.TB, gvr *schema.GroupVersionResource, getRestStore storage.Interface) { + t.Helper() + + store, ok := getRestStore.(*apiServerStorage) + assert.Equal(t, true, ok) + assert.Equal(t, gvr.Resource, store.resource) + assert.Equal(t, gvr.GroupVersion(), store.restClient.APIVersion()) +} diff --git a/pkg/yurthub/multiplexer/storage/api_server_storage_test.go b/pkg/yurthub/multiplexer/storage/api_server_storage_test.go new file mode 100644 index 00000000000..ac9cdc6824f --- /dev/null +++ b/pkg/yurthub/multiplexer/storage/api_server_storage_test.go @@ -0,0 +1,267 @@ +/* +Copyright 2024 The OpenYurt Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package storage + +import ( + "bytes" + "context" + "io" + "net/http" + "testing" + + "github.com/stretchr/testify/assert" + corev1 "k8s.io/api/core/v1" + discovery "k8s.io/api/discovery/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/runtime/serializer/streaming" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/apiserver/pkg/storage" + "k8s.io/client-go/kubernetes/scheme" + "k8s.io/client-go/rest/fake" +) + +var ( + corev1GV = schema.GroupVersion{Version: "v1"} + corev1Codec = scheme.Codecs.CodecForVersions(scheme.Codecs.LegacyCodec(corev1GV), scheme.Codecs.UniversalDecoder(corev1GV), corev1GV, corev1GV) + + discoveryGV = schema.GroupVersion{Group: "discovery.k8s.io", Version: "v1"} + discoveryv1Codec = scheme.Codecs.CodecForVersions(scheme.Codecs.LegacyCodec(discoveryGV), scheme.Codecs.UniversalDecoder(discoveryGV), discoveryGV, discoveryGV) +) + +func TestRestStore_GetList(t *testing.T) { + t.Run(" list services", func(t *testing.T) { + rs := &apiServerStorage{ + restClient: newFakeClient(corev1GV, mockServiceListBody(), newListHeader()), + } + + getListObj := &corev1.ServiceList{} + err := rs.GetList(context.Background(), "", storage.ListOptions{}, getListObj) + + assert.Nil(t, err) + assert.Equal(t, 1, len(getListObj.Items)) + }) + + t.Run("list endpointslices", func(t *testing.T) { + rs := &apiServerStorage{ + restClient: newFakeClient(corev1GV, mockEndpointSlicesListBody(), newListHeader()), + } + + getListObj := &discovery.EndpointSliceList{} + err := rs.GetList(context.Background(), "", storage.ListOptions{}, getListObj) + + assert.Nil(t, err) + assert.Equal(t, 1, len(getListObj.Items)) + }) +} + +func newListHeader() http.Header { + header := http.Header{} + header.Set("Content-Type", runtime.ContentTypeJSON) + return header +} + +func mockServiceListBody() []byte { + str := runtime.EncodeOrDie(corev1Codec, newServiceList()) + return []byte(str) +} + +func mockEndpointSlicesListBody() []byte { + str := runtime.EncodeOrDie(discoveryv1Codec, newEndpointSliceList()) + return []byte(str) +} + +func newServiceList() *corev1.ServiceList { + return &corev1.ServiceList{ + TypeMeta: metav1.TypeMeta{ + Kind: "List", + APIVersion: "v1", + }, + Items: []corev1.Service{ + *newService(), + }, + } +} + +func newFakeClient(gv schema.GroupVersion, body []byte, header http.Header) *fake.RESTClient { + return &fake.RESTClient{ + GroupVersion: gv, + NegotiatedSerializer: scheme.Codecs.WithoutConversion(), + Client: fake.CreateHTTPClient(func(req *http.Request) (*http.Response, error) { + return &http.Response{ + StatusCode: http.StatusOK, + Header: header, + Body: io.NopCloser(bytes.NewReader(body)), + }, nil + }), + } +} + +func newEndpointSliceList() *discovery.EndpointSliceList { + return &discovery.EndpointSliceList{ + TypeMeta: metav1.TypeMeta{ + Kind: "List", + APIVersion: "v1", + }, + Items: []discovery.EndpointSlice{ + newEndpointSlice(), + }, + } +} + +func newEndpointSlice() discovery.EndpointSlice { + return discovery.EndpointSlice{ + TypeMeta: metav1.TypeMeta{ + Kind: "EndpointSlice", + APIVersion: "discovery.k8s.io/v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "coredns-12345", + Namespace: "kube-system", + }, + Endpoints: []discovery.Endpoint{ + { + Addresses: []string{"192.168.0.1"}, + NodeName: newStringPointer("node1"), + }, + { + Addresses: []string{"192.168.1.1"}, + NodeName: newStringPointer("node2"), + }, + { + Addresses: []string{"192.168.2.3"}, + NodeName: newStringPointer("node3"), + }, + }, + } +} + +func newStringPointer(str string) *string { + return &str +} + +func TestRestStore_Watch(t *testing.T) { + rs := &apiServerStorage{ + restClient: newFakeClient(corev1GV, mockServiceWatchBody(), newWatchHeader()), + } + + resultCh, err := rs.Watch(context.Background(), "", storage.ListOptions{}) + event := <-resultCh.ResultChan() + + assert.Nil(t, err) + assert.Equal(t, event.Type, watch.Added) +} + +func newWatchHeader() http.Header { + header := http.Header{} + header.Set("Transfer-Encoding", "chunked") + header.Set("Content-Type", runtime.ContentTypeJSON) + return header +} + +func mockServiceWatchBody() []byte { + serializer := scheme.Codecs.SupportedMediaTypes()[0] + framer := serializer.StreamSerializer.Framer + streamSerializer := serializer.StreamSerializer.Serializer + encoder := scheme.Codecs.EncoderForVersion(streamSerializer, corev1GV) + + buf := &bytes.Buffer{} + fb := framer.NewFrameWriter(buf) + + e := streaming.NewEncoder(fb, encoder) + + e.Encode(newOutEvent(newService())) + + return buf.Bytes() +} + +func newOutEvent(object runtime.Object) *metav1.WatchEvent { + internalEvent := metav1.InternalEvent{ + Type: watch.Added, + Object: object, + } + + outEvent := &metav1.WatchEvent{} + metav1.Convert_v1_InternalEvent_To_v1_WatchEvent(&internalEvent, outEvent, nil) + + return outEvent +} + +func TestRestStore_Versioner(t *testing.T) { + rs := &apiServerStorage{} + + assert.Nil(t, rs.Versioner()) +} + +func TestRestStore_Create(t *testing.T) { + rs := &apiServerStorage{} + err := rs.Create(context.TODO(), "", newService(), newService(), 1) + + assert.Equal(t, ErrNoSupport, err) +} + +func TestRestStore_Delete(t *testing.T) { + rs := &apiServerStorage{} + err := rs.Delete(context.TODO(), "", newService(), nil, nil, nil) + + assert.Equal(t, ErrNoSupport, err) +} + +func TestRestStore_Get(t *testing.T) { + rs := &apiServerStorage{} + err := rs.Get(context.TODO(), "", storage.GetOptions{}, nil) + + assert.Equal(t, ErrNoSupport, err) +} + +func TestRestStore_GuaranteedUpdate(t *testing.T) { + rs := &apiServerStorage{} + err := rs.GuaranteedUpdate(context.TODO(), "", newService(), false, nil, nil, nil) + + assert.Equal(t, ErrNoSupport, err) +} + +func TestRestStore_Count(t *testing.T) { + rs := &apiServerStorage{} + _, err := rs.Count("") + + assert.Equal(t, ErrNoSupport, err) +} + +func TestRestStore_RequestWatchProgress(t *testing.T) { + rs := &apiServerStorage{} + err := rs.RequestWatchProgress(context.TODO()) + + assert.Equal(t, ErrNoSupport, err) +} + +func newService() *corev1.Service { + return &corev1.Service{ + TypeMeta: metav1.TypeMeta{ + Kind: "Service", + APIVersion: "v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "kube-dns", + Namespace: "kube-system", + }, + Spec: corev1.ServiceSpec{ + ClusterIP: "192.168.0.10", + }, + } +} diff --git a/pkg/yurthub/multiplexer/storage/fake_storage.go b/pkg/yurthub/multiplexer/storage/fake_storage.go new file mode 100644 index 00000000000..b2123bb2374 --- /dev/null +++ b/pkg/yurthub/multiplexer/storage/fake_storage.go @@ -0,0 +1,135 @@ +/* +Copyright 2024 The OpenYurt Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package storage + +import ( + "context" + "fmt" + "strings" + + v1 "k8s.io/api/core/v1" + discovery "k8s.io/api/discovery/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/apiserver/pkg/storage" +) + +type CommonFakeStorage struct { +} + +func (fs *CommonFakeStorage) Versioner() storage.Versioner { + return nil +} + +func (fs *CommonFakeStorage) Create(ctx context.Context, key string, obj, out runtime.Object, ttl uint64) error { + return nil +} + +func (fs *CommonFakeStorage) Delete( + ctx context.Context, key string, out runtime.Object, preconditions *storage.Preconditions, + validateDeletion storage.ValidateObjectFunc, cachedExistingObject runtime.Object) error { + return nil +} + +func (fs *CommonFakeStorage) Get(ctx context.Context, key string, opts storage.GetOptions, objPtr runtime.Object) error { + return nil +} + +func (fs *CommonFakeStorage) GuaranteedUpdate( + ctx context.Context, key string, destination runtime.Object, ignoreNotFound bool, + preconditions *storage.Preconditions, tryUpdate storage.UpdateFunc, cachedExistingObject runtime.Object) error { + return nil +} + +func (fs *CommonFakeStorage) Count(key string) (int64, error) { + return 0, nil +} + +func (fs *CommonFakeStorage) RequestWatchProgress(ctx context.Context) error { + return nil +} + +type FakeServiceStorage struct { + *CommonFakeStorage + items []v1.Service + watcher *watch.FakeWatcher +} + +func NewFakeServiceStorage(items []v1.Service) *FakeServiceStorage { + return &FakeServiceStorage{ + CommonFakeStorage: &CommonFakeStorage{}, + items: items, + watcher: watch.NewFake(), + } +} + +func (fs *FakeServiceStorage) GetList(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error { + serviceList := listObj.(*v1.ServiceList) + serviceList.ListMeta = metav1.ListMeta{ + ResourceVersion: "100", + } + serviceList.Items = fs.items + return nil +} + +func (fs *FakeServiceStorage) Watch(ctx context.Context, key string, opts storage.ListOptions) (watch.Interface, error) { + return fs.watcher, nil +} + +func (fs *FakeServiceStorage) AddWatchObject(svc *v1.Service) { + svc.ResourceVersion = "101" + fs.watcher.Add(svc) +} + +type FakeEndpointSliceStorage struct { + *CommonFakeStorage + items []discovery.EndpointSlice + watcher *watch.FakeWatcher +} + +func NewFakeEndpointSliceStorage(items []discovery.EndpointSlice) *FakeEndpointSliceStorage { + return &FakeEndpointSliceStorage{ + CommonFakeStorage: &CommonFakeStorage{}, + items: items, + watcher: watch.NewFake(), + } +} + +func (fs *FakeEndpointSliceStorage) GetList(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error { + epsList := listObj.(*discovery.EndpointSliceList) + epsList.ListMeta = metav1.ListMeta{ + ResourceVersion: "100", + } + + for _, item := range fs.items { + itemKey := fmt.Sprintf("/%s/%s", item.Namespace, item.Name) + if strings.HasPrefix(itemKey, key) { + epsList.Items = append(epsList.Items, item) + } + } + return nil +} + +func (fs *FakeEndpointSliceStorage) Watch(ctx context.Context, key string, opts storage.ListOptions) (watch.Interface, error) { + return fs.watcher, nil +} + +func (fs *FakeEndpointSliceStorage) AddWatchObject(eps *discovery.EndpointSlice) { + eps.ResourceVersion = "101" + fs.watcher.Add(eps) +} diff --git a/pkg/yurthub/multiplexer/storage/fake_storage_provider.go b/pkg/yurthub/multiplexer/storage/fake_storage_provider.go new file mode 100644 index 00000000000..30dcff91f3d --- /dev/null +++ b/pkg/yurthub/multiplexer/storage/fake_storage_provider.go @@ -0,0 +1,42 @@ +/* +Copyright 2024 The OpenYurt Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package storage + +import ( + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apiserver/pkg/storage" +) + +type DummyStorageManager struct { + StorageMap map[string]storage.Interface + Err error +} + +func NewDummyStorageManager(storageMap map[string]storage.Interface) *DummyStorageManager { + return &DummyStorageManager{ + StorageMap: storageMap, + Err: nil, + } +} + +func (dsm *DummyStorageManager) ResourceStorage(gvr *schema.GroupVersionResource) (storage.Interface, error) { + if store, ok := dsm.StorageMap[gvr.String()]; ok { + return store, dsm.Err + } + + return dsm.StorageMap[gvr.String()], dsm.Err +} diff --git a/pkg/yurthub/proxy/autonomy/autonomy_test.go b/pkg/yurthub/proxy/autonomy/autonomy_test.go index 27aefdb3c6e..b4741b3437c 100644 --- a/pkg/yurthub/proxy/autonomy/autonomy_test.go +++ b/pkg/yurthub/proxy/autonomy/autonomy_test.go @@ -49,7 +49,7 @@ func TestHttpServeKubeletGetNode(t *testing.T) { } storageWrapper := cachemanager.NewStorageWrapper(dStorage) serializerM := serializer.NewSerializerManager() - cacheM := cachemanager.NewCacheManager(storageWrapper, serializerM, nil, fakeSharedInformerFactory) + cacheM := cachemanager.NewCacheManager("node1", storageWrapper, serializerM, nil, fakeSharedInformerFactory) autonomyProxy := NewAutonomyProxy(nil, cacheM) diff --git a/pkg/yurthub/proxy/local/local_test.go b/pkg/yurthub/proxy/local/local_test.go index f8b9eaa71fd..96208acc4d5 100644 --- a/pkg/yurthub/proxy/local/local_test.go +++ b/pkg/yurthub/proxy/local/local_test.go @@ -65,7 +65,7 @@ func TestServeHTTPForWatch(t *testing.T) { } sWrapper := cachemanager.NewStorageWrapper(dStorage) serializerM := serializer.NewSerializerManager() - cacheM := cachemanager.NewCacheManager(sWrapper, serializerM, nil, fakeSharedInformerFactory) + cacheM := cachemanager.NewCacheManager("node1", sWrapper, serializerM, nil, fakeSharedInformerFactory) fn := func() bool { return false @@ -157,7 +157,7 @@ func TestServeHTTPForWatchWithHealthyChange(t *testing.T) { } sWrapper := cachemanager.NewStorageWrapper(dStorage) serializerM := serializer.NewSerializerManager() - cacheM := cachemanager.NewCacheManager(sWrapper, serializerM, nil, fakeSharedInformerFactory) + cacheM := cachemanager.NewCacheManager("node1", sWrapper, serializerM, nil, fakeSharedInformerFactory) cnt := 0 fn := func() bool { @@ -242,7 +242,7 @@ func TestServeHTTPForWatchWithMinRequestTimeout(t *testing.T) { } sWrapper := cachemanager.NewStorageWrapper(dStorage) serializerM := serializer.NewSerializerManager() - cacheM := cachemanager.NewCacheManager(sWrapper, serializerM, nil, fakeSharedInformerFactory) + cacheM := cachemanager.NewCacheManager("node1", sWrapper, serializerM, nil, fakeSharedInformerFactory) fn := func() bool { return false @@ -334,7 +334,7 @@ func TestServeHTTPForPost(t *testing.T) { } sWrapper := cachemanager.NewStorageWrapper(dStorage) serializerM := serializer.NewSerializerManager() - cacheM := cachemanager.NewCacheManager(sWrapper, serializerM, nil, fakeSharedInformerFactory) + cacheM := cachemanager.NewCacheManager("node1", sWrapper, serializerM, nil, fakeSharedInformerFactory) fn := func() bool { return false @@ -414,7 +414,7 @@ func TestServeHTTPForDelete(t *testing.T) { } sWrapper := cachemanager.NewStorageWrapper(dStorage) serializerM := serializer.NewSerializerManager() - cacheM := cachemanager.NewCacheManager(sWrapper, serializerM, nil, fakeSharedInformerFactory) + cacheM := cachemanager.NewCacheManager("node1", sWrapper, serializerM, nil, fakeSharedInformerFactory) fn := func() bool { return false @@ -481,7 +481,7 @@ func TestServeHTTPForGetReqCache(t *testing.T) { } sWrapper := cachemanager.NewStorageWrapper(dStorage) serializerM := serializer.NewSerializerManager() - cacheM := cachemanager.NewCacheManager(sWrapper, serializerM, nil, fakeSharedInformerFactory) + cacheM := cachemanager.NewCacheManager("node1", sWrapper, serializerM, nil, fakeSharedInformerFactory) fn := func() bool { return false @@ -634,7 +634,7 @@ func TestServeHTTPForListReqCache(t *testing.T) { sWrapper := cachemanager.NewStorageWrapper(dStorage) serializerM := serializer.NewSerializerManager() restRESTMapperMgr, _ := hubmeta.NewRESTMapperManager(rootDir) - cacheM := cachemanager.NewCacheManager(sWrapper, serializerM, restRESTMapperMgr, fakeSharedInformerFactory) + cacheM := cachemanager.NewCacheManager("node1", sWrapper, serializerM, restRESTMapperMgr, fakeSharedInformerFactory) fn := func() bool { return false diff --git a/pkg/yurthub/proxy/multiplexer/filterwatch.go b/pkg/yurthub/proxy/multiplexer/filterwatch.go new file mode 100644 index 00000000000..b79836f5f1b --- /dev/null +++ b/pkg/yurthub/proxy/multiplexer/filterwatch.go @@ -0,0 +1,93 @@ +/* +Copyright 2024 The OpenYurt Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package multiplexer + +import ( + "k8s.io/apimachinery/pkg/runtime" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/watch" + + yurtutil "github.com/openyurtio/openyurt/pkg/util" + "github.com/openyurtio/openyurt/pkg/yurthub/filter" +) + +type filterWatch struct { + source watch.Interface + filter filter.ObjectFilter + result chan watch.Event + done chan struct{} +} + +func (f *filterWatch) Stop() { + select { + case <-f.done: + default: + close(f.done) + f.source.Stop() + } +} + +func newFilterWatch(source watch.Interface, filter filter.ObjectFilter) watch.Interface { + if filter == nil { + return source + } + + fw := &filterWatch{ + source: source, + filter: filter, + result: make(chan watch.Event), + done: make(chan struct{}), + } + + go fw.receive() + + return fw +} + +func (f *filterWatch) ResultChan() <-chan watch.Event { + return f.result +} + +func (f *filterWatch) receive() { + defer utilruntime.HandleCrash() + defer close(f.result) + defer f.Stop() + + for result := range f.source.ResultChan() { + watchType := result.Type + newObj := result.Object + if co, ok := newObj.(runtime.CacheableObject); ok { + newObj = co.GetObject() + } + + if !(result.Type == watch.Bookmark || result.Type == watch.Error) { + if newObj = f.filter.Filter(newObj, f.done); yurtutil.IsNil(newObj) { + watchType = watch.Deleted + newObj = result.Object + } + } + + select { + case <-f.done: + return + case f.result <- watch.Event{ + Type: watchType, + Object: newObj, + }: + } + } +} diff --git a/pkg/yurthub/proxy/multiplexer/filterwatch_test.go b/pkg/yurthub/proxy/multiplexer/filterwatch_test.go new file mode 100644 index 00000000000..d72d0c1596b --- /dev/null +++ b/pkg/yurthub/proxy/multiplexer/filterwatch_test.go @@ -0,0 +1,108 @@ +/* +Copyright 2024 The OpenYurt Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package multiplexer + +import ( + "testing" + + "github.com/stretchr/testify/assert" + discoveryv1 "k8s.io/api/discovery/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/watch" + + ctesting "github.com/openyurtio/openyurt/pkg/yurthub/proxy/multiplexer/testing" +) + +func TestFilterWatch_ResultChan(t *testing.T) { + t.Run("test filter endpointslices", func(t *testing.T) { + source := watch.NewFake() + filter := &ctesting.IgnoreEndpointslicesWithNodeName{IgnoreNodeName: "node1"} + fw := newFilterWatch(source, filter) + + go func() { + source.Add(mockEndpointslices()) + }() + + assertFilterWatchEvent(t, fw) + }) + + t.Run("test cacheable object", func(t *testing.T) { + source := watch.NewFake() + filter := &ctesting.IgnoreEndpointslicesWithNodeName{IgnoreNodeName: "node1"} + + fw := newFilterWatch(source, filter) + + go func() { + source.Add(mockCacheableObject()) + }() + + assertFilterWatchEvent(t, fw) + }) +} + +func mockEndpointslices() *discoveryv1.EndpointSlice { + node1 := "node1" + node2 := "node2" + + return &discoveryv1.EndpointSlice{ + TypeMeta: metav1.TypeMeta{ + Kind: "EndpointSlice", + APIVersion: "discoveryv1discoveryv1.k8s.io/v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "coredns-12345", + Namespace: "kube-system", + }, + Endpoints: []discoveryv1.Endpoint{ + { + Addresses: []string{"172.16.0.1"}, + NodeName: &node1, + }, + { + Addresses: []string{"172.17.0.1"}, + NodeName: &node2, + }, + }, + } +} + +func assertFilterWatchEvent(t testing.TB, fw watch.Interface) { + t.Helper() + + event := <-fw.ResultChan() + endpointslice, ok := event.Object.(*discoveryv1.EndpointSlice) + + assert.Equal(t, true, ok) + assert.Equal(t, 1, len(endpointslice.Endpoints)) + assert.Equal(t, *endpointslice.Endpoints[0].NodeName, "node2") +} + +func mockCacheableObject() *ctesting.MockCacheableObject { + return &ctesting.MockCacheableObject{ + Obj: mockEndpointslices(), + } +} + +func TestFilterWatch_Stop(t *testing.T) { + source := watch.NewFake() + filter := &ctesting.IgnoreEndpointslicesWithNodeName{IgnoreNodeName: "node1"} + fw := newFilterWatch(source, filter) + + fw.Stop() + + assert.Equal(t, true, source.IsStopped()) +} diff --git a/pkg/yurthub/proxy/multiplexer/multiplexerlist.go b/pkg/yurthub/proxy/multiplexer/multiplexerlist.go new file mode 100644 index 00000000000..acdaaf889e8 --- /dev/null +++ b/pkg/yurthub/proxy/multiplexer/multiplexerlist.go @@ -0,0 +1,159 @@ +/* +Copyright 2024 The OpenYurt Authors. +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package multiplexer + +import ( + "net/http" + + "github.com/pkg/errors" + "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apiserver/pkg/endpoints/request" + "k8s.io/apiserver/pkg/registry/generic/registry" + kstorage "k8s.io/apiserver/pkg/storage" + "k8s.io/klog/v2" + + yurtutil "github.com/openyurtio/openyurt/pkg/util" + "github.com/openyurtio/openyurt/pkg/yurthub/filter" + "github.com/openyurtio/openyurt/pkg/yurthub/util" +) + +func (sp *multiplexerProxy) multiplexerList(w http.ResponseWriter, r *http.Request, gvr *schema.GroupVersionResource) { + scope, err := sp.getReqScope(gvr) + if err != nil { + util.Err(errors.Wrapf(err, "failed to get request scope"), w, r) + return + } + + listOpts, err := sp.decodeListOptions(r, scope) + if err != nil { + util.Err(errors.Wrapf(err, "failed to decode list options, url: %v", r.URL), w, r) + return + } + + storageOpts, err := sp.storageOpts(listOpts, gvr) + if err != nil { + util.Err(err, w, r) + return + } + + obj, err := sp.listObject(r, gvr, storageOpts) + if err != nil { + util.Err(err, w, r) + return + } + + util.WriteObject(http.StatusOK, obj, w, r) +} + +func (sp *multiplexerProxy) listObject(r *http.Request, gvr *schema.GroupVersionResource, storageOpts *kstorage.ListOptions) (runtime.Object, error) { + rc, _, err := sp.requestsMultiplexerManager.ResourceCache(gvr) + if err != nil { + return nil, errors.Wrap(err, "failed to get resource cache") + } + + obj, err := sp.newListObject(gvr) + if err != nil { + return nil, errors.Wrapf(err, "failed to new list object") + } + + key, err := sp.getCacheKey(r, storageOpts) + if err != nil { + return nil, errors.Wrapf(err, "failed to get cache key") + } + + if err := rc.GetList(r.Context(), key, *storageOpts, obj); err != nil { + return nil, errors.Wrapf(err, "failed to get list from cache") + } + + if obj, err = sp.filterListObject(obj, sp.filterMgr.FindObjectFilters(r)); err != nil { + return nil, errors.Wrapf(err, "failed to filter list object") + } + + return obj, nil +} + +func (sp *multiplexerProxy) newListObject(gvr *schema.GroupVersionResource) (runtime.Object, error) { + rcc, err := sp.requestsMultiplexerManager.ResourceCacheConfig(gvr) + if err != nil { + return nil, errors.Wrapf(err, "failed to get resource cache config") + } + + return rcc.NewListFunc(), nil +} + +func (sp *multiplexerProxy) getCacheKey(r *http.Request, storageOpts *kstorage.ListOptions) (string, error) { + if ns := sp.getNamespace(r); len(ns) > 0 { + return sp.getNamespaceScopedCacheKey(r, storageOpts) + } + + return sp.getClusterScopedCacheKey(r, storageOpts) +} + +func (sp *multiplexerProxy) getNamespaceScopedCacheKey(r *http.Request, storageOpts *kstorage.ListOptions) (string, error) { + ctx := request.WithNamespace(r.Context(), sp.getNamespace(r)) + + if name, ok := storageOpts.Predicate.MatchesSingle(); ok { + return registry.NamespaceKeyFunc(ctx, "", name) + } + + return registry.NamespaceKeyRootFunc(ctx, ""), nil +} + +func (sp *multiplexerProxy) getNamespace(r *http.Request) string { + requestInfo, ok := request.RequestInfoFrom(r.Context()) + if !ok { + return "" + } + return requestInfo.Namespace +} + +func (sp *multiplexerProxy) getClusterScopedCacheKey(r *http.Request, storageOpts *kstorage.ListOptions) (string, error) { + if name, ok := storageOpts.Predicate.MatchesSingle(); ok { + return registry.NoNamespaceKeyFunc(r.Context(), "", name) + } + + return "", nil +} + +func (sp *multiplexerProxy) filterListObject(obj runtime.Object, filter filter.ObjectFilter) (runtime.Object, error) { + if yurtutil.IsNil(filter) { + return obj, nil + } + + items, err := meta.ExtractList(obj) + + if err != nil || len(items) == 0 { + return filter.Filter(obj, sp.stop), nil + } + + list := make([]runtime.Object, 0) + for _, item := range items { + newObj := filter.Filter(item, sp.stop) + if !yurtutil.IsNil(newObj) { + list = append(list, newObj) + } + } + + if err = meta.SetList(obj, list); err != nil { + klog.Warningf("filter %s doesn't work correctly, couldn't set list, %v.", filter.Name(), err) + } + + return obj, nil +} diff --git a/pkg/yurthub/proxy/multiplexer/multiplexerproxy.go b/pkg/yurthub/proxy/multiplexer/multiplexerproxy.go new file mode 100644 index 00000000000..95061ae70b1 --- /dev/null +++ b/pkg/yurthub/proxy/multiplexer/multiplexerproxy.go @@ -0,0 +1,245 @@ +/* +Copyright 2024 The OpenYurt Authors. +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package multiplexer + +import ( + "net/http" + + "github.com/pkg/errors" + kerrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/meta" + metainternalversion "k8s.io/apimachinery/pkg/apis/meta/internalversion" + metainternalversionscheme "k8s.io/apimachinery/pkg/apis/meta/internalversion/scheme" + "k8s.io/apimachinery/pkg/apis/meta/internalversion/validation" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apiserver/pkg/authorization/authorizerfactory" + "k8s.io/apiserver/pkg/endpoints/handlers" + "k8s.io/apiserver/pkg/endpoints/request" + "k8s.io/apiserver/pkg/registry/rest" + kstorage "k8s.io/apiserver/pkg/storage" + "k8s.io/client-go/kubernetes/scheme" + "k8s.io/klog/v2" + + "github.com/openyurtio/openyurt/pkg/yurthub/filter" + "github.com/openyurtio/openyurt/pkg/yurthub/multiplexer" + "github.com/openyurtio/openyurt/pkg/yurthub/util" +) + +type multiplexerProxy struct { + requestsMultiplexerManager multiplexer.MultiplexerManager + filterMgr filter.FilterFinder + stop <-chan struct{} +} + +func NewMultiplexerProxy(filterMgr filter.FilterFinder, + cacheManager multiplexer.MultiplexerManager, + multiplexerResources []schema.GroupVersionResource, + stop <-chan struct{}) (*multiplexerProxy, error) { + + sp := &multiplexerProxy{ + stop: stop, + requestsMultiplexerManager: cacheManager, + filterMgr: filterMgr, + } + + for _, gvr := range multiplexerResources { + if _, _, err := sp.requestsMultiplexerManager.ResourceCache(&gvr); err != nil { + return sp, errors.Wrapf(err, "failed to init resource cache for %s", gvr.String()) + } + } + + return sp, nil +} + +func (sp *multiplexerProxy) ServeHTTP(w http.ResponseWriter, r *http.Request) { + reqInfo, _ := request.RequestInfoFrom(r.Context()) + gvr := sp.getRequestGVR(reqInfo) + + switch reqInfo.Verb { + case "list": + sp.multiplexerList(w, r, gvr) + case "watch": + sp.multiplexerWatch(w, r, gvr) + default: + util.Err(errors.Errorf("Multiplexer proxy does not support the request method %s", reqInfo.Verb), w, r) + } +} + +func (sp *multiplexerProxy) getRequestGVR(reqInfo *request.RequestInfo) *schema.GroupVersionResource { + return &schema.GroupVersionResource{ + Group: reqInfo.APIGroup, + Version: reqInfo.APIVersion, + Resource: reqInfo.Resource, + } +} + +func (sp *multiplexerProxy) getReqScope(gvr *schema.GroupVersionResource) (*handlers.RequestScope, error) { + fqKindToRegister, err := sp.findKind(gvr) + if err != nil { + return nil, err + } + + return &handlers.RequestScope{ + Serializer: scheme.Codecs, + ParameterCodec: scheme.ParameterCodec, + Convertor: scheme.Scheme, + Defaulter: scheme.Scheme, + Typer: scheme.Scheme, + UnsafeConvertor: runtime.UnsafeObjectConvertor(scheme.Scheme), + Authorizer: authorizerfactory.NewAlwaysAllowAuthorizer(), + + EquivalentResourceMapper: runtime.NewEquivalentResourceRegistry(), + + // TODO: Check for the interface on storage + TableConvertor: rest.NewDefaultTableConvertor(gvr.GroupResource()), + + // TODO: This seems wrong for cross-group subresources. It makes an assumption that a subresource and its parent are in the same group version. Revisit this. + Resource: *gvr, + Kind: fqKindToRegister, + + HubGroupVersion: schema.GroupVersion{Group: fqKindToRegister.Group, Version: runtime.APIVersionInternal}, + + MetaGroupVersion: metav1.SchemeGroupVersion, + + MaxRequestBodyBytes: int64(3 * 1024 * 1024), + Namer: handlers.ContextBasedNaming{ + Namer: runtime.Namer(meta.NewAccessor()), + }, + }, nil +} + +func (sp *multiplexerProxy) findKind(gvr *schema.GroupVersionResource) (schema.GroupVersionKind, error) { + object, err := sp.newListObject(gvr) + if err != nil { + return schema.GroupVersionKind{}, errors.Wrapf(err, "failed to new list object") + } + + fqKinds, _, err := scheme.Scheme.ObjectKinds(object) + if err != nil { + return schema.GroupVersionKind{}, err + } + + for _, fqKind := range fqKinds { + if fqKind.Group == gvr.Group { + return fqKind, nil + } + } + + return schema.GroupVersionKind{}, nil +} + +func (sp *multiplexerProxy) decodeListOptions(req *http.Request, scope *handlers.RequestScope) (opts metainternalversion.ListOptions, err error) { + if err := metainternalversionscheme.ParameterCodec.DecodeParameters(req.URL.Query(), metav1.SchemeGroupVersion, &opts); err != nil { + return opts, err + } + + if errs := validation.ValidateListOptions(&opts, false); len(errs) > 0 { + err := kerrors.NewInvalid(schema.GroupKind{Group: metav1.GroupName, Kind: "ListOptions"}, "", errs) + return opts, err + } + + if opts.FieldSelector != nil { + fn := func(label, value string) (newLabel, newValue string, err error) { + return scope.Convertor.ConvertFieldLabel(scope.Kind, label, value) + } + if opts.FieldSelector, err = opts.FieldSelector.Transform(fn); err != nil { + return opts, kerrors.NewBadRequest(err.Error()) + } + } + + hasName := true + _, name, err := scope.Namer.Name(req) + if err != nil { + hasName = false + } + + if hasName { + nameSelector := fields.OneTermEqualSelector("metadata.name", name) + if opts.FieldSelector != nil && !opts.FieldSelector.Empty() { + selectedName, ok := opts.FieldSelector.RequiresExactMatch("metadata.name") + if !ok || name != selectedName { + return opts, kerrors.NewBadRequest("fieldSelector metadata.name doesn't match requested name") + } + } else { + opts.FieldSelector = nameSelector + } + } + + return opts, nil +} + +func (sp *multiplexerProxy) storageOpts(listOpts metainternalversion.ListOptions, gvr *schema.GroupVersionResource) (*kstorage.ListOptions, error) { + p := sp.selectionPredicate(listOpts, gvr) + + return &kstorage.ListOptions{ + ResourceVersion: getResourceVersion(listOpts), + ResourceVersionMatch: listOpts.ResourceVersionMatch, + Recursive: isRecursive(p), + Predicate: p, + SendInitialEvents: listOpts.SendInitialEvents, + }, nil +} + +func (sp *multiplexerProxy) selectionPredicate(listOpts metainternalversion.ListOptions, gvr *schema.GroupVersionResource) kstorage.SelectionPredicate { + label := labels.Everything() + if listOpts.LabelSelector != nil { + label = listOpts.LabelSelector + } + + field := fields.Everything() + if listOpts.FieldSelector != nil { + field = listOpts.FieldSelector + } + + return kstorage.SelectionPredicate{ + Label: label, + Field: field, + Limit: listOpts.Limit, + Continue: listOpts.Continue, + GetAttrs: sp.getAttrFunc(gvr), + AllowWatchBookmarks: listOpts.AllowWatchBookmarks, + } +} + +func getResourceVersion(opts metainternalversion.ListOptions) string { + if opts.ResourceVersion == "" { + return "0" + } + return opts.ResourceVersion +} + +func isRecursive(p kstorage.SelectionPredicate) bool { + if _, ok := p.MatchesSingle(); ok { + return false + } + return true +} + +func (sp *multiplexerProxy) getAttrFunc(gvr *schema.GroupVersionResource) kstorage.AttrFunc { + rcc, err := sp.requestsMultiplexerManager.ResourceCacheConfig(gvr) + if err != nil { + klog.Errorf("failed to get cache config for %v, error: %v", gvr, err) + return nil + } + + return rcc.GetAttrsFunc +} diff --git a/pkg/yurthub/proxy/multiplexer/multiplexerproxy_test.go b/pkg/yurthub/proxy/multiplexer/multiplexerproxy_test.go new file mode 100644 index 00000000000..10ff9f4b7f1 --- /dev/null +++ b/pkg/yurthub/proxy/multiplexer/multiplexerproxy_test.go @@ -0,0 +1,383 @@ +/* +Copyright 2024 The OpenYurt Authors. +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package multiplexer + +import ( + "bytes" + "net/http" + "net/http/httptest" + "testing" + "time" + + "github.com/stretchr/testify/assert" + discovery "k8s.io/api/discovery/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apiserver/pkg/endpoints/request" + "k8s.io/apiserver/pkg/server" + "k8s.io/client-go/kubernetes/scheme" + + "github.com/openyurtio/openyurt/pkg/yurthub/filter" + "github.com/openyurtio/openyurt/pkg/yurthub/multiplexer" + "github.com/openyurtio/openyurt/pkg/yurthub/multiplexer/storage" + ctesting "github.com/openyurtio/openyurt/pkg/yurthub/proxy/multiplexer/testing" +) + +var ( + discoveryGV = schema.GroupVersion{Group: "discovery.k8s.io", Version: "v1"} + + endpointSliceGVR = discoveryGV.WithResource("endpointslices") +) + +var mockEndpoints = []discovery.Endpoint{ + { + Addresses: []string{"192.168.0.1"}, + NodeName: newStringPointer("node1"), + }, + { + Addresses: []string{"192.168.1.1"}, + NodeName: newStringPointer("node2"), + }, + { + Addresses: []string{"192.168.2.3"}, + NodeName: newStringPointer("node3"), + }, +} + +func mockCacheMap() map[string]multiplexer.Interface { + return map[string]multiplexer.Interface{ + endpointSliceGVR.String(): storage.NewFakeEndpointSliceStorage( + []discovery.EndpointSlice{ + *newEndpointSlice(metav1.NamespaceSystem, "coredns-12345", "", mockEndpoints), + *newEndpointSlice(metav1.NamespaceDefault, "nginx", "", mockEndpoints), + }, + ), + } +} + +func mockResourceCacheMap() map[string]*multiplexer.ResourceCacheConfig { + return map[string]*multiplexer.ResourceCacheConfig{ + endpointSliceGVR.String(): { + KeyFunc: multiplexer.KeyFunc, + NewListFunc: func() runtime.Object { + return &discovery.EndpointSliceList{} + }, + NewFunc: func() runtime.Object { + return &discovery.EndpointSlice{} + }, + GetAttrsFunc: multiplexer.AttrsFunc, + }, + } +} + +func newEndpointSlice(namespace string, name string, resourceVersion string, endpoints []discovery.Endpoint) *discovery.EndpointSlice { + return &discovery.EndpointSlice{ + TypeMeta: metav1.TypeMeta{ + Kind: "EndpointSlice", + APIVersion: "discovery.k8s.io/v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + ResourceVersion: resourceVersion, + }, + Endpoints: endpoints, + } +} + +type wrapResponse struct { + Done chan struct{} + *httptest.ResponseRecorder +} + +func (wr *wrapResponse) Write(buf []byte) (int, error) { + l, err := wr.ResponseRecorder.Write(buf) + wr.Done <- struct{}{} + return l, err +} + +func TestShareProxy_ServeHTTP_LIST(t *testing.T) { + for _, tc := range []struct { + tName string + filterManager filter.FilterFinder + url string + expectedEndPointSliceList *discovery.EndpointSliceList + err error + }{ + { + "test list endpoint slices no filter", + &ctesting.EmptyFilterManager{}, + "/apis/discovery.k8s.io/v1/endpointslices", + expectEndpointSliceListNoFilter(), + + nil, + }, + { + "test list endpoint slice with filter", + &ctesting.FakeEndpointSliceFilter{ + NodeName: "node1", + }, + "/apis/discovery.k8s.io/v1/endpointslices", + expectEndpointSliceListWithFilter(), + nil, + }, + { + "test list endpoint slice with namespace", + &ctesting.FakeEndpointSliceFilter{ + NodeName: "node1", + }, + "/apis/discovery.k8s.io/v1/namespaces/default/endpointslices", + expectEndpointSliceListWithNamespace(), + nil, + }, + } { + t.Run(tc.tName, func(t *testing.T) { + w := &httptest.ResponseRecorder{ + Body: &bytes.Buffer{}, + } + + sp, err := NewMultiplexerProxy(tc.filterManager, + multiplexer.NewFakeCacheManager(mockCacheMap(), mockResourceCacheMap()), + []schema.GroupVersionResource{endpointSliceGVR}, + make(<-chan struct{})) + + assert.Equal(t, tc.err, err) + + sp.ServeHTTP(w, newEndpointSliceListRequest(tc.url)) + + assert.Equal(t, string(encodeEndpointSliceList(tc.expectedEndPointSliceList)), w.Body.String()) + }) + } +} + +func expectEndpointSliceListNoFilter() *discovery.EndpointSliceList { + return &discovery.EndpointSliceList{ + TypeMeta: metav1.TypeMeta{ + Kind: "List", + APIVersion: "v1", + }, + ListMeta: metav1.ListMeta{ + ResourceVersion: "100", + }, + Items: []discovery.EndpointSlice{ + *newEndpointSlice(metav1.NamespaceSystem, "coredns-12345", "", mockEndpoints), + *newEndpointSlice(metav1.NamespaceDefault, "nginx", "", mockEndpoints), + }, + } +} + +func newStringPointer(str string) *string { + return &str +} + +func expectEndpointSliceListWithFilter() *discovery.EndpointSliceList { + endpoints := []discovery.Endpoint{ + { + Addresses: []string{"192.168.1.1"}, + NodeName: newStringPointer("node2"), + }, + { + Addresses: []string{"192.168.2.3"}, + NodeName: newStringPointer("node3"), + }, + } + + return &discovery.EndpointSliceList{ + TypeMeta: metav1.TypeMeta{ + Kind: "List", + APIVersion: "v1", + }, + ListMeta: metav1.ListMeta{ + ResourceVersion: "100", + }, + Items: []discovery.EndpointSlice{ + *newEndpointSlice(metav1.NamespaceSystem, "coredns-12345", "", endpoints), + *newEndpointSlice(metav1.NamespaceDefault, "nginx", "", endpoints), + }, + } +} + +func expectEndpointSliceListWithNamespace() *discovery.EndpointSliceList { + endpoints := []discovery.Endpoint{ + { + Addresses: []string{"192.168.1.1"}, + NodeName: newStringPointer("node2"), + }, + { + Addresses: []string{"192.168.2.3"}, + NodeName: newStringPointer("node3"), + }, + } + + return &discovery.EndpointSliceList{ + TypeMeta: metav1.TypeMeta{ + Kind: "List", + APIVersion: "v1", + }, + ListMeta: metav1.ListMeta{ + ResourceVersion: "100", + }, + Items: []discovery.EndpointSlice{ + *newEndpointSlice(metav1.NamespaceDefault, "nginx", "", endpoints), + }, + } +} + +func newEndpointSliceListRequest(url string) *http.Request { + req := httptest.NewRequest("GET", url, &bytes.Buffer{}) + + ctx := req.Context() + req = req.WithContext(request.WithRequestInfo(ctx, resolverRequestInfo(req))) + + return req +} + +func resolverRequestInfo(req *http.Request) *request.RequestInfo { + cfg := &server.Config{ + LegacyAPIGroupPrefixes: sets.NewString(server.DefaultLegacyAPIPrefix), + } + resolver := server.NewRequestInfoResolver(cfg) + info, _ := resolver.NewRequestInfo(req) + return info +} + +func encodeEndpointSliceList(endpointSliceList *discovery.EndpointSliceList) []byte { + discoveryv1Codec := scheme.Codecs.CodecForVersions(scheme.Codecs.LegacyCodec(discoveryGV), scheme.Codecs.UniversalDecoder(discoveryGV), discoveryGV, discoveryGV) + + str := runtime.EncodeOrDie(discoveryv1Codec, endpointSliceList) + return []byte(str) +} + +func TestShareProxy_ServeHTTP_WATCH(t *testing.T) { + for _, tc := range []struct { + tName string + filterManager filter.FilterFinder + url string + expectedWatchEvent *metav1.WatchEvent + Err error + }{ + {"test watch endpointslice no filter", + &ctesting.EmptyFilterManager{}, + "/apis/discovery.k8s.io/v1/endpointslices?watch=true&&resourceVersion=0&&timeoutSeconds=3", + expectedWatchEventNoFilter(), + nil, + }, + {"test watch endpointslice with filter", + &ctesting.FakeEndpointSliceFilter{ + NodeName: "node1", + }, + "/apis/discovery.k8s.io/v1/endpointslices?watch=true&&resourceVersion=0&&timeoutSeconds=3", + expectedWatchEventWithFilter(), + nil, + }, + } { + t.Run(tc.tName, func(t *testing.T) { + fcm := multiplexer.NewFakeCacheManager(mockCacheMap(), mockResourceCacheMap()) + + sp, _ := NewMultiplexerProxy( + tc.filterManager, + fcm, + []schema.GroupVersionResource{endpointSliceGVR}, + make(<-chan struct{}), + ) + + req := newWatchEndpointSliceRequest(tc.url) + w := newWatchResponse() + + go func() { + sp.ServeHTTP(w, req) + }() + generateWatchEvent(fcm) + + assertWatchResp(t, tc.expectedWatchEvent, w) + }) + } +} + +func expectedWatchEventNoFilter() *metav1.WatchEvent { + return &metav1.WatchEvent{ + Type: "ADDED", + Object: runtime.RawExtension{ + Object: newEndpointSlice(metav1.NamespaceSystem, "coredns-23456", "101", mockEndpoints), + }, + } +} + +func expectedWatchEventWithFilter() *metav1.WatchEvent { + endpoints := []discovery.Endpoint{ + { + Addresses: []string{"192.168.1.1"}, + NodeName: newStringPointer("node2"), + }, + { + Addresses: []string{"192.168.2.3"}, + NodeName: newStringPointer("node3"), + }, + } + return &metav1.WatchEvent{ + Type: "ADDED", + Object: runtime.RawExtension{ + Object: newEndpointSlice(metav1.NamespaceSystem, "coredns-23456", "101", endpoints), + }, + } +} + +func newWatchEndpointSliceRequest(url string) *http.Request { + req := httptest.NewRequest("GET", url, &bytes.Buffer{}) + + ctx := req.Context() + req = req.WithContext(request.WithRequestInfo(ctx, resolverRequestInfo(req))) + + return req +} + +func newWatchResponse() *wrapResponse { + return &wrapResponse{ + make(chan struct{}), + &httptest.ResponseRecorder{ + Body: &bytes.Buffer{}, + }, + } +} + +func generateWatchEvent(fcm *multiplexer.FakeCacheManager) { + fs, _, _ := fcm.ResourceCache(&endpointSliceGVR) + + fess, _ := fs.(*storage.FakeEndpointSliceStorage) + fess.AddWatchObject(newEndpointSlice(metav1.NamespaceSystem, "coredns-23456", "102", mockEndpoints)) +} + +func assertWatchResp(t testing.TB, expectedWatchEvent *metav1.WatchEvent, w *wrapResponse) { + t.Helper() + + select { + case <-time.After(5 * time.Second): + t.Errorf("wait watch timeout") + case <-w.Done: + assert.Equal(t, string(encodeWatchEventList(expectedWatchEvent)), w.Body.String()) + } +} + +func encodeWatchEventList(watchEvent *metav1.WatchEvent) []byte { + metav1Codec := scheme.Codecs.CodecForVersions(scheme.Codecs.LegacyCodec(discoveryGV), scheme.Codecs.UniversalDecoder(discoveryGV), discoveryGV, discoveryGV) + + str := runtime.EncodeOrDie(metav1Codec, watchEvent) + return []byte(str) +} diff --git a/pkg/yurthub/proxy/multiplexer/multiplexerwatch.go b/pkg/yurthub/proxy/multiplexer/multiplexerwatch.go new file mode 100644 index 00000000000..f90ba22b444 --- /dev/null +++ b/pkg/yurthub/proxy/multiplexer/multiplexerwatch.go @@ -0,0 +1,246 @@ +/* +Copyright 2024 The OpenYurt Authors. +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package multiplexer + +import ( + "context" + "fmt" + "math/rand" + "net/http" + "time" + + "k8s.io/apimachinery/pkg/api/errors" + metainternalversion "k8s.io/apimachinery/pkg/apis/meta/internalversion" + metainternalversionscheme "k8s.io/apimachinery/pkg/apis/meta/internalversion/scheme" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/validation" + metav1beta1 "k8s.io/apimachinery/pkg/apis/meta/v1beta1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/apiserver/pkg/endpoints/handlers" + "k8s.io/apiserver/pkg/endpoints/handlers/negotiation" + "k8s.io/apiserver/pkg/endpoints/request" + "k8s.io/klog/v2" + + "github.com/openyurtio/openyurt/pkg/yurthub/util" +) + +const ( + minRequestTimeout = 300 * time.Second +) + +var neverExitWatch <-chan time.Time = make(chan time.Time) + +// realTimeoutFactory implements timeoutFactory +type realTimeoutFactory struct { + timeout time.Duration +} + +// TimeoutCh returns a channel which will receive something when the watch times out, +// and a cleanup function to call when this happens. +func (w *realTimeoutFactory) TimeoutCh() (<-chan time.Time, func() bool) { + if w.timeout == 0 { + return neverExitWatch, func() bool { return false } + } + t := time.NewTimer(w.timeout) + return t.C, t.Stop +} + +func (sp *multiplexerProxy) multiplexerWatch(w http.ResponseWriter, r *http.Request, gvr *schema.GroupVersionResource) { + reqScope, err := sp.getReqScope(gvr) + if err != nil { + util.Err(err, w, r) + return + } + + listOpts, err := sp.decodeListOptions(r, reqScope) + if err != nil { + util.Err(err, w, r) + return + } + + storageOpts, err := sp.storageOpts(listOpts, gvr) + if err != nil { + util.Err(err, w, r) + } + + timeout := getTimeout(&listOpts) + ctx, cancel := context.WithTimeout(r.Context(), timeout) + defer cancel() + + outputMediaType, _, err := negotiation.NegotiateOutputMediaType(r, reqScope.Serializer, reqScope) + if err != nil { + util.Err(err, w, r) + return + } + + rc, _, err := sp.requestsMultiplexerManager.ResourceCache(gvr) + if err != nil { + util.Err(err, w, r) + return + } + + key, err := sp.getCacheKey(r, storageOpts) + if err != nil { + util.Err(err, w, r) + return + } + + watcher, err := rc.Watch(ctx, key, *storageOpts) + if err != nil { + util.Err(err, w, r) + return + } + + klog.V(3).InfoS("Starting watch", "path", r.URL.Path, "resourceVersion", listOpts.ResourceVersion, "labels", listOpts.LabelSelector, "fields", listOpts.FieldSelector, "timeout", timeout) + serveWatch(newFilterWatch(watcher, sp.filterMgr.FindObjectFilters(r)), reqScope, outputMediaType, r, w, timeout) +} + +func getTimeout(opts *metainternalversion.ListOptions) time.Duration { + timeout := time.Duration(0) + if opts.TimeoutSeconds != nil { + timeout = time.Duration(*opts.TimeoutSeconds) * time.Second + } + if timeout == 0 && minRequestTimeout > 0 { + timeout = time.Duration(float64(minRequestTimeout) * (rand.Float64() + 1.0)) + } + return timeout +} + +func serveWatch(watcher watch.Interface, scope *handlers.RequestScope, mediaTypeOptions negotiation.MediaTypeOptions, req *http.Request, w http.ResponseWriter, timeout time.Duration) { + defer watcher.Stop() + + handler, err := serveWatchHandler(watcher, scope, mediaTypeOptions, req, timeout) + if err != nil { + util.Err(err, w, req) + return + } + + handler.ServeHTTP(w, req) +} + +func serveWatchHandler(watcher watch.Interface, scope *handlers.RequestScope, mediaTypeOptions negotiation.MediaTypeOptions, req *http.Request, timeout time.Duration) (http.Handler, error) { + + options, err := optionsForTransform(mediaTypeOptions, req) + if err != nil { + return nil, errors.NewInternalError(fmt.Errorf("failed to get options from transform, error: %v", err)) + } + + // negotiate for the stream serializer from the scope's serializer + serializer, err := negotiation.NegotiateOutputMediaTypeStream(req, scope.Serializer, scope) + if err != nil { + return nil, errors.NewInternalError(fmt.Errorf("failed to get output media type stream, error: %v", err)) + } + + framer := serializer.StreamSerializer.Framer + streamSerializer := serializer.StreamSerializer.Serializer + encoder := scope.Serializer.EncoderForVersion(streamSerializer, scope.Kind.GroupVersion()) + useTextFraming := serializer.EncodesAsText + if framer == nil { + return nil, errors.NewInternalError(fmt.Errorf("no framer defined for %q available for embedded encoding", serializer.MediaType)) + } + // TODO: next step, get back mediaTypeOptions from negotiate and return the exact value here + mediaType := serializer.MediaType + if mediaType != runtime.ContentTypeJSON { + mediaType += ";stream=watch" + } + + // locate the appropriate embedded encoder based on the transform + var embeddedEncoder runtime.Encoder + contentKind, contentSerializer, transform := targetEncodingForTransform(scope, mediaTypeOptions, req) + if transform { + info, ok := runtime.SerializerInfoForMediaType(contentSerializer.SupportedMediaTypes(), serializer.MediaType) + if !ok { + return nil, errors.NewInternalError(fmt.Errorf("no encoder for %q exists in the requested target %#v", serializer.MediaType, contentSerializer)) + } + embeddedEncoder = contentSerializer.EncoderForVersion(info.Serializer, contentKind.GroupVersion()) + } else { + embeddedEncoder = scope.Serializer.EncoderForVersion(serializer.Serializer, contentKind.GroupVersion()) + } + + var memoryAllocator runtime.MemoryAllocator + + if encoderWithAllocator, supportsAllocator := embeddedEncoder.(runtime.EncoderWithAllocator); supportsAllocator { + // don't put the allocator inside the embeddedEncodeFn as that would allocate memory on every call. + // instead, we allocate the buffer for the entire watch session and release it when we close the connection. + memoryAllocator = runtime.AllocatorPool.Get().(*runtime.Allocator) + embeddedEncoder = runtime.NewEncoderWithAllocator(encoderWithAllocator, memoryAllocator) + } + + var tableOptions *metav1.TableOptions + if options != nil { + if passedOptions, ok := options.(*metav1.TableOptions); ok { + tableOptions = passedOptions + } else { + return nil, errors.NewInternalError(fmt.Errorf("unexpected options type: %T", options)) + } + } + embeddedEncoder = newWatchEmbeddedEncoder(req.Context(), embeddedEncoder, mediaTypeOptions.Convert, tableOptions, scope) + + var serverShuttingDownCh <-chan struct{} + if signals := request.ServerShutdownSignalFrom(req.Context()); signals != nil { + serverShuttingDownCh = signals.ShuttingDown() + } + + server := &handlers.WatchServer{ + Watching: watcher, + Scope: scope, + + UseTextFraming: useTextFraming, + MediaType: mediaType, + Framer: framer, + Encoder: encoder, + EmbeddedEncoder: embeddedEncoder, + + TimeoutFactory: &realTimeoutFactory{timeout}, + ServerShuttingDownCh: serverShuttingDownCh, + } + + return http.HandlerFunc(server.HandleHTTP), nil +} + +func optionsForTransform(mediaType negotiation.MediaTypeOptions, req *http.Request) (interface{}, error) { + switch target := mediaType.Convert; { + case target == nil: + case target.Kind == "Table" && (target.GroupVersion() == metav1beta1.SchemeGroupVersion || target.GroupVersion() == metav1.SchemeGroupVersion): + opts := &metav1.TableOptions{} + if err := metainternalversionscheme.ParameterCodec.DecodeParameters(req.URL.Query(), metav1.SchemeGroupVersion, opts); err != nil { + return nil, err + } + switch errs := validation.ValidateTableOptions(opts); len(errs) { + case 0: + return opts, nil + case 1: + return nil, errors.NewBadRequest(fmt.Sprintf("Unable to convert to Table as requested: %v", errs[0].Error())) + default: + return nil, errors.NewBadRequest(fmt.Sprintf("Unable to convert to Table as requested: %v", errs)) + } + } + return nil, nil +} + +func targetEncodingForTransform(scope *handlers.RequestScope, mediaType negotiation.MediaTypeOptions, req *http.Request) (schema.GroupVersionKind, runtime.NegotiatedSerializer, bool) { + switch target := mediaType.Convert; { + case target == nil: + case (target.Kind == "PartialObjectMetadata" || target.Kind == "PartialObjectMetadataList" || target.Kind == "Table") && + (target.GroupVersion() == metav1beta1.SchemeGroupVersion || target.GroupVersion() == metav1.SchemeGroupVersion): + return *target, metainternalversionscheme.Codecs, true + } + return scope.Kind, scope.Serializer, false +} diff --git a/pkg/yurthub/proxy/multiplexer/testing/fake_endpointslicesfilter.go b/pkg/yurthub/proxy/multiplexer/testing/fake_endpointslicesfilter.go new file mode 100644 index 00000000000..e6be9b9952b --- /dev/null +++ b/pkg/yurthub/proxy/multiplexer/testing/fake_endpointslicesfilter.go @@ -0,0 +1,55 @@ +/* +Copyright 2024 The OpenYurt Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package testing + +import ( + discovery "k8s.io/api/discovery/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/sets" +) + +type IgnoreEndpointslicesWithNodeName struct { + IgnoreNodeName string +} + +func (ie *IgnoreEndpointslicesWithNodeName) Name() string { + return "ignoreendpointsliceswithname" +} + +func (ie *IgnoreEndpointslicesWithNodeName) SupportedResourceAndVerbs() map[string]sets.Set[string] { + return nil +} + +// Filter is used for filtering runtime object +// all filter logic should be located in it. +func (ie *IgnoreEndpointslicesWithNodeName) Filter(obj runtime.Object, stopCh <-chan struct{}) runtime.Object { + endpointslice, ok := obj.(*discovery.EndpointSlice) + if !ok { + return obj + } + + var newEps []discovery.Endpoint + + for _, ep := range endpointslice.Endpoints { + if *ep.NodeName != ie.IgnoreNodeName { + newEps = append(newEps, ep) + } + } + endpointslice.Endpoints = newEps + + return endpointslice +} diff --git a/pkg/yurthub/proxy/multiplexer/testing/fake_filtermanager.go b/pkg/yurthub/proxy/multiplexer/testing/fake_filtermanager.go new file mode 100644 index 00000000000..26f7489ccc5 --- /dev/null +++ b/pkg/yurthub/proxy/multiplexer/testing/fake_filtermanager.go @@ -0,0 +1,48 @@ +/* +Copyright 2024 The OpenYurt Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package testing + +import ( + "net/http" + + "github.com/openyurtio/openyurt/pkg/yurthub/filter" +) + +type EmptyFilterManager struct { +} + +func (fm *EmptyFilterManager) FindResponseFilter(req *http.Request) (filter.ResponseFilter, bool) { + return nil, false +} + +func (fm *EmptyFilterManager) FindObjectFilters(req *http.Request) filter.ObjectFilter { + return nil +} + +type FakeEndpointSliceFilter struct { + NodeName string +} + +func (fm *FakeEndpointSliceFilter) FindResponseFilter(req *http.Request) (filter.ResponseFilter, bool) { + return nil, false +} + +func (fm *FakeEndpointSliceFilter) FindObjectFilters(req *http.Request) filter.ObjectFilter { + return &IgnoreEndpointslicesWithNodeName{ + fm.NodeName, + } +} diff --git a/pkg/yurthub/proxy/multiplexer/testing/mock_cacheableobject.go b/pkg/yurthub/proxy/multiplexer/testing/mock_cacheableobject.go new file mode 100644 index 00000000000..7c954601493 --- /dev/null +++ b/pkg/yurthub/proxy/multiplexer/testing/mock_cacheableobject.go @@ -0,0 +1,44 @@ +/* +Copyright 2024 The OpenYurt Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package testing + +import ( + "io" + + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" +) + +type MockCacheableObject struct { + Obj runtime.Object +} + +func (mc *MockCacheableObject) CacheEncode(id runtime.Identifier, encode func(runtime.Object, io.Writer) error, w io.Writer) error { + return nil +} + +func (mc *MockCacheableObject) GetObject() runtime.Object { + return mc.Obj +} + +func (mc *MockCacheableObject) GetObjectKind() schema.ObjectKind { + return nil +} + +func (mc *MockCacheableObject) DeepCopyObject() runtime.Object { + return mc.Obj +} diff --git a/pkg/yurthub/proxy/multiplexer/watchembeddedencoder.go b/pkg/yurthub/proxy/multiplexer/watchembeddedencoder.go new file mode 100644 index 00000000000..8ac19ecb3c7 --- /dev/null +++ b/pkg/yurthub/proxy/multiplexer/watchembeddedencoder.go @@ -0,0 +1,303 @@ +/* +Copyright 2024 The OpenYurt Authors. +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package multiplexer + +import ( + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "reflect" + + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/meta" + metainternalversionscheme "k8s.io/apimachinery/pkg/apis/meta/internalversion/scheme" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + metav1beta1 "k8s.io/apimachinery/pkg/apis/meta/v1beta1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apiserver/pkg/endpoints/handlers" + "k8s.io/apiserver/pkg/endpoints/handlers/negotiation" + "k8s.io/klog/v2" +) + +// watchEmbeddedEncoder performs encoding of the embedded object. +// +// NOTE: watchEmbeddedEncoder is NOT thread-safe. +type watchEmbeddedEncoder struct { + encoder runtime.Encoder + + ctx context.Context + + // target, if non-nil, configures transformation type. + // The other options are ignored if target is nil. + target *schema.GroupVersionKind + tableOptions *metav1.TableOptions + scope *handlers.RequestScope + + // identifier of the encoder, computed lazily + identifier runtime.Identifier +} + +func newWatchEmbeddedEncoder(ctx context.Context, encoder runtime.Encoder, target *schema.GroupVersionKind, tableOptions *metav1.TableOptions, scope *handlers.RequestScope) *watchEmbeddedEncoder { + return &watchEmbeddedEncoder{ + encoder: encoder, + ctx: ctx, + target: target, + tableOptions: tableOptions, + scope: scope, + } +} + +// Encode implements runtime.Encoder interface. +func (e *watchEmbeddedEncoder) Encode(obj runtime.Object, w io.Writer) error { + if co, ok := obj.(runtime.CacheableObject); ok { + return co.CacheEncode(e.Identifier(), e.doEncode, w) + } + return e.doEncode(obj, w) +} + +func (e *watchEmbeddedEncoder) doEncode(obj runtime.Object, w io.Writer) error { + result, err := doTransformObject(e.ctx, obj, e.tableOptions, e.target, e.scope) + if err != nil { + utilruntime.HandleError(fmt.Errorf("failed to transform object %v: %v", reflect.TypeOf(obj), err)) + result = obj + } + + // When we are transforming to a table, use the original table options when + // we should print headers only on the first object - headers should be + // omitted on subsequent events. + if e.tableOptions != nil && !e.tableOptions.NoHeaders { + e.tableOptions.NoHeaders = true + // With options change, we should recompute the identifier. + // Clearing this will trigger lazy recompute when needed. + e.identifier = "" + } + + return e.encoder.Encode(result, w) +} + +// Identifier implements runtime.Encoder interface. +func (e *watchEmbeddedEncoder) Identifier() runtime.Identifier { + if e.identifier == "" { + e.identifier = e.embeddedIdentifier() + } + return e.identifier +} + +type watchEmbeddedEncoderIdentifier struct { + Name string `json:"name,omitempty"` + Encoder string `json:"encoder,omitempty"` + Target string `json:"target,omitempty"` + Options metav1.TableOptions `json:"options,omitempty"` + NoHeaders bool `json:"noHeaders,omitempty"` +} + +func (e *watchEmbeddedEncoder) embeddedIdentifier() runtime.Identifier { + if e.target == nil { + // If no conversion is performed, we effective only use + // the embedded identifier. + return e.encoder.Identifier() + } + identifier := watchEmbeddedEncoderIdentifier{ + Name: "watch-embedded", + Encoder: string(e.encoder.Identifier()), + Target: e.target.String(), + } + if e.target.Kind == "Table" && e.tableOptions != nil { + identifier.Options = *e.tableOptions + identifier.NoHeaders = e.tableOptions.NoHeaders + } + + result, err := json.Marshal(identifier) + if err != nil { + klog.Fatalf("Failed marshaling identifier for watchEmbeddedEncoder: %v", err) + } + return runtime.Identifier(result) +} + +// doTransformResponseObject is used for handling all requests, including watch. +func doTransformObject(ctx context.Context, obj runtime.Object, opts interface{}, target *schema.GroupVersionKind, scope *handlers.RequestScope) (runtime.Object, error) { + if _, ok := obj.(*metav1.Status); ok { + return obj, nil + } + + switch { + case target == nil: + // If we ever change that from a no-op, the identifier of + // the watchEmbeddedEncoder has to be adjusted accordingly. + return obj, nil + + case target.Kind == "PartialObjectMetadata": + return asPartialObjectMetadata(obj, target.GroupVersion()) + + case target.Kind == "PartialObjectMetadataList": + return asPartialObjectMetadataList(obj, target.GroupVersion()) + + case target.Kind == "Table": + options, ok := opts.(*metav1.TableOptions) + if !ok { + return nil, fmt.Errorf("unexpected TableOptions, got %T", opts) + } + return asTable(ctx, obj, options, scope, target.GroupVersion()) + + default: + accepted, _ := negotiation.MediaTypesForSerializer(metainternalversionscheme.Codecs) + err := negotiation.NewNotAcceptableError(accepted) + return nil, err + } +} + +func asTable(ctx context.Context, result runtime.Object, opts *metav1.TableOptions, scope *handlers.RequestScope, groupVersion schema.GroupVersion) (runtime.Object, error) { + switch groupVersion { + case metav1beta1.SchemeGroupVersion, metav1.SchemeGroupVersion: + default: + return nil, newNotAcceptableError(fmt.Sprintf("no Table exists in group version %s", groupVersion)) + } + + obj, err := scope.TableConvertor.ConvertToTable(ctx, result, opts) + if err != nil { + return nil, err + } + + table := (*metav1.Table)(obj) + + for i := range table.Rows { + item := &table.Rows[i] + switch opts.IncludeObject { + case metav1.IncludeObject: + item.Object.Object, err = scope.Convertor.ConvertToVersion(item.Object.Object, scope.Kind.GroupVersion()) + if err != nil { + return nil, err + } + // TODO: rely on defaulting for the value here? + case metav1.IncludeMetadata, "": + m, err := meta.Accessor(item.Object.Object) + if err != nil { + return nil, err + } + // TODO: turn this into an internal type and do conversion in order to get object kind automatically set? + partial := meta.AsPartialObjectMetadata(m) + partial.GetObjectKind().SetGroupVersionKind(groupVersion.WithKind("PartialObjectMetadata")) + item.Object.Object = partial + case metav1.IncludeNone: + item.Object.Object = nil + default: + err = errors.NewBadRequest(fmt.Sprintf("unrecognized includeObject value: %q", opts.IncludeObject)) + return nil, err + } + } + + return table, nil +} + +// errNotAcceptable indicates Accept negotiation has failed +type errNotAcceptable struct { + message string +} + +func newNotAcceptableError(message string) error { + return errNotAcceptable{message} +} + +func (e errNotAcceptable) Error() string { + return e.message +} + +func (e errNotAcceptable) Status() metav1.Status { + return metav1.Status{ + Status: metav1.StatusFailure, + Code: http.StatusNotAcceptable, + Reason: metav1.StatusReason("NotAcceptable"), + Message: e.Error(), + } +} + +func asPartialObjectMetadata(result runtime.Object, groupVersion schema.GroupVersion) (runtime.Object, error) { + if meta.IsListType(result) { + err := newNotAcceptableError(fmt.Sprintf("you requested PartialObjectMetadata, but the requested object is a list (%T)", result)) + return nil, err + } + switch groupVersion { + case metav1beta1.SchemeGroupVersion, metav1.SchemeGroupVersion: + default: + return nil, newNotAcceptableError(fmt.Sprintf("no PartialObjectMetadataList exists in group version %s", groupVersion)) + } + m, err := meta.Accessor(result) + if err != nil { + return nil, err + } + partial := meta.AsPartialObjectMetadata(m) + partial.GetObjectKind().SetGroupVersionKind(groupVersion.WithKind("PartialObjectMetadata")) + return partial, nil +} + +func asPartialObjectMetadataList(result runtime.Object, groupVersion schema.GroupVersion) (runtime.Object, error) { + li, ok := result.(metav1.ListInterface) + if !ok { + return nil, newNotAcceptableError(fmt.Sprintf("you requested PartialObjectMetadataList, but the requested object is not a list (%T)", result)) + } + + gvk := groupVersion.WithKind("PartialObjectMetadata") + switch { + case groupVersion == metav1beta1.SchemeGroupVersion: + list := &metav1beta1.PartialObjectMetadataList{} + err := meta.EachListItem(result, func(obj runtime.Object) error { + m, err := meta.Accessor(obj) + if err != nil { + return err + } + partial := meta.AsPartialObjectMetadata(m) + partial.GetObjectKind().SetGroupVersionKind(gvk) + list.Items = append(list.Items, *partial) + return nil + }) + if err != nil { + return nil, err + } + list.ResourceVersion = li.GetResourceVersion() + list.Continue = li.GetContinue() + list.RemainingItemCount = li.GetRemainingItemCount() + return list, nil + + case groupVersion == metav1.SchemeGroupVersion: + list := &metav1.PartialObjectMetadataList{} + err := meta.EachListItem(result, func(obj runtime.Object) error { + m, err := meta.Accessor(obj) + if err != nil { + return err + } + partial := meta.AsPartialObjectMetadata(m) + partial.GetObjectKind().SetGroupVersionKind(gvk) + list.Items = append(list.Items, *partial) + return nil + }) + if err != nil { + return nil, err + } + list.ResourceVersion = li.GetResourceVersion() + list.Continue = li.GetContinue() + list.RemainingItemCount = li.GetRemainingItemCount() + return list, nil + + default: + return nil, newNotAcceptableError(fmt.Sprintf("no PartialObjectMetadataList exists in group version %s", groupVersion)) + } +} diff --git a/pkg/yurthub/proxy/proxy.go b/pkg/yurthub/proxy/proxy.go index 7c791e7a129..2fae2998b63 100644 --- a/pkg/yurthub/proxy/proxy.go +++ b/pkg/yurthub/proxy/proxy.go @@ -19,6 +19,7 @@ package proxy import ( "bytes" "errors" + "fmt" "io" "net/http" "net/url" @@ -40,6 +41,7 @@ import ( hubrest "github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/rest" "github.com/openyurtio/openyurt/pkg/yurthub/proxy/autonomy" "github.com/openyurtio/openyurt/pkg/yurthub/proxy/local" + "github.com/openyurtio/openyurt/pkg/yurthub/proxy/multiplexer" "github.com/openyurtio/openyurt/pkg/yurthub/proxy/pool" "github.com/openyurtio/openyurt/pkg/yurthub/proxy/remote" "github.com/openyurtio/openyurt/pkg/yurthub/proxy/util" @@ -50,6 +52,8 @@ import ( coordinatorconstants "github.com/openyurtio/openyurt/pkg/yurthub/yurtcoordinator/constants" ) +const multiplexerProxyPostHookName = "multiplexerProxy" + type yurtReverseProxy struct { resolver apirequest.RequestInfoResolver loadBalancer remote.LoadBalancer @@ -63,6 +67,9 @@ type yurtReverseProxy struct { isCoordinatorReady func() bool workingMode hubutil.WorkingMode enableYurtCoordinator bool + multiplexerProxy http.Handler + multiplexerResources []schema.GroupVersionResource + nodeName string } // NewYurtReverseProxyHandler creates a http handler for proxying @@ -158,6 +165,21 @@ func NewYurtReverseProxyHandler( enableYurtCoordinator: yurtHubCfg.EnableCoordinator, tenantMgr: tenantMgr, workingMode: yurtHubCfg.WorkingMode, + multiplexerResources: yurtHubCfg.MultiplexerResources, + nodeName: yurtHubCfg.NodeName, + } + + if yurtHubCfg.PostStartHooks == nil { + yurtHubCfg.PostStartHooks = make(map[string]func() error) + } + yurtHubCfg.PostStartHooks[multiplexerProxyPostHookName] = func() error { + if yurtProxy.multiplexerProxy, err = multiplexer.NewMultiplexerProxy(yurtHubCfg.FilterManager, + yurtHubCfg.RequestMultiplexerManager, + yurtHubCfg.MultiplexerResources, + stopCh); err != nil { + return fmt.Errorf("failed to new default share proxy, error: %v", err) + } + return nil } return yurtProxy.buildHandlerChain(yurtProxy), nil @@ -214,6 +236,10 @@ func (p *yurtReverseProxy) ServeHTTP(rw http.ResponseWriter, req *http.Request) p.poolScopedResourceHandler(rw, req) case util.IsSubjectAccessReviewCreateGetRequest(req): p.subjectAccessReviewHandler(rw, req) + case util.IsMultiplexerRequest(req, p.multiplexerResources, p.nodeName): + if p.multiplexerProxy != nil { + p.multiplexerProxy.ServeHTTP(rw, req) + } default: // For resource request that do not need to be handled by yurt-coordinator, // handling the request with cloud apiserver or local cache. diff --git a/pkg/yurthub/proxy/util/util.go b/pkg/yurthub/proxy/util/util.go index dca98212e14..9cc214e19a9 100644 --- a/pkg/yurthub/proxy/util/util.go +++ b/pkg/yurthub/proxy/util/util.go @@ -574,3 +574,37 @@ func ReListWatchReq(rw http.ResponseWriter, req *http.Request) { klog.Infof("this request write error event back finished.") rw.(http.Flusher).Flush() } + +func IsMultiplexerRequest(req *http.Request, multiplexerResources []schema.GroupVersionResource, nodeName string) bool { + ctx := req.Context() + + if req.UserAgent() == util.MultiplexerProxyClientUserAgentPrefix+nodeName { + return false + } + + info, ok := apirequest.RequestInfoFrom(ctx) + if !ok { + return false + } + + if info.Verb != "list" && info.Verb != "watch" { + return false + } + + return isMultiplexerResource(info, multiplexerResources) +} + +func isMultiplexerResource(info *apirequest.RequestInfo, multiplexerResources []schema.GroupVersionResource) bool { + gvr := schema.GroupVersionResource{ + Group: info.APIGroup, + Version: info.APIVersion, + Resource: info.Resource, + } + + for _, resource := range multiplexerResources { + if gvr.String() == resource.String() { + return true + } + } + return false +} diff --git a/pkg/yurthub/server/server.go b/pkg/yurthub/server/server.go index 194ca1ca896..ecd851b632a 100644 --- a/pkg/yurthub/server/server.go +++ b/pkg/yurthub/server/server.go @@ -21,6 +21,7 @@ import ( "net/http" "github.com/gorilla/mux" + "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus/promhttp" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/labels" @@ -41,6 +42,7 @@ func RunYurtHubServers(cfg *config.YurtHubConfiguration, proxyHandler http.Handler, rest *rest.RestConfigManager, stopCh <-chan struct{}) error { + hubServerHandler := mux.NewRouter() registerHandlers(hubServerHandler, cfg, rest) @@ -73,6 +75,11 @@ func RunYurtHubServers(cfg *config.YurtHubConfiguration, } } + for name, hook := range cfg.PostStartHooks { + if err := hook(); err != nil { + return errors.Wrapf(err, "failed to run post start hooks: %s", name) + } + } return nil } diff --git a/pkg/yurthub/util/util.go b/pkg/yurthub/util/util.go index 79820689593..9e2346b89d6 100644 --- a/pkg/yurthub/util/util.go +++ b/pkg/yurthub/util/util.go @@ -86,6 +86,8 @@ const ( CacheUserAgentsKey = "cache_agents" PoolScopeResourcesKey = "pool_scope_resources" + MultiplexerProxyClientUserAgentPrefix = "multiplexer-proxy-" + YurtHubProxyPort = 10261 YurtHubPort = 10267 YurtHubProxySecurePort = 10268 diff --git a/pkg/yurthub/yurtcoordinator/coordinator.go b/pkg/yurthub/yurtcoordinator/coordinator.go index 3a36ad3df6e..b502ff4768f 100644 --- a/pkg/yurthub/yurtcoordinator/coordinator.go +++ b/pkg/yurthub/yurtcoordinator/coordinator.go @@ -404,6 +404,7 @@ func (coordinator *coordinator) buildPoolCacheStore() (cachemanager.CacheManager } poolCacheManager := cachemanager.NewCacheManager( + "", cachemanager.NewStorageWrapper(etcdStore), coordinator.serializerMgr, coordinator.restMapperMgr, diff --git a/pkg/yurtmanager/controller/util/node/controller_utils.go b/pkg/yurtmanager/controller/util/node/controller_utils.go index fb9ad0eb48b..58408538ba4 100644 --- a/pkg/yurtmanager/controller/util/node/controller_utils.go +++ b/pkg/yurtmanager/controller/util/node/controller_utils.go @@ -23,7 +23,7 @@ import ( "fmt" "time" - v1 "k8s.io/api/core/v1" + corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" @@ -69,12 +69,12 @@ var UpdateLabelBackoff = wait.Backoff{ // DeletePods will delete all pods from master running on given node, // and return true if any pods were deleted, or were found pending // deletion. -func DeletePods(ctx context.Context, c client.Client, pods []*v1.Pod, recorder record.EventRecorder, nodeName, nodeUID string) (bool, error) { +func DeletePods(ctx context.Context, c client.Client, pods []*corev1.Pod, recorder record.EventRecorder, nodeName, nodeUID string) (bool, error) { remaining := false var updateErrList []error if len(pods) > 0 { - RecordNodeEvent(ctx, recorder, nodeName, nodeUID, v1.EventTypeNormal, "DeletingAllPods", fmt.Sprintf("Deleting all Pods from Node %v.", nodeName)) + RecordNodeEvent(ctx, recorder, nodeName, nodeUID, corev1.EventTypeNormal, "DeletingAllPods", fmt.Sprintf("Deleting all Pods from Node %v.", nodeName)) } for i := range pods { @@ -100,7 +100,7 @@ func DeletePods(ctx context.Context, c client.Client, pods []*v1.Pod, recorder r } klog.InfoS("Starting deletion of pod", "pod", klog.KObj(pod)) - recorder.Eventf(pod, v1.EventTypeNormal, "NodeControllerEviction", "Marking for deletion Pod %s from Node %s", pod.Name, nodeName) + recorder.Eventf(pod, corev1.EventTypeNormal, "NodeControllerEviction", "Marking for deletion Pod %s from Node %s", pod.Name, nodeName) //if err := kubeClient.CoreV1().Pods(pod.Namespace).Delete(ctx, pod.Name, metav1.DeleteOptions{}); err != nil { if err := c.Delete(ctx, pod); err != nil { if apierrors.IsNotFound(err) { @@ -122,7 +122,7 @@ func DeletePods(ctx context.Context, c client.Client, pods []*v1.Pod, recorder r // SetPodTerminationReason attempts to set a reason and message in the // pod status, updates it in the apiserver, and returns an error if it // encounters one. -func SetPodTerminationReason(ctx context.Context, c client.Client, pod *v1.Pod, nodeName string) (*v1.Pod, error) { +func SetPodTerminationReason(ctx context.Context, c client.Client, pod *corev1.Pod, nodeName string) (*corev1.Pod, error) { if pod.Status.Reason == NodeUnreachablePodReason { return pod, nil } @@ -140,7 +140,7 @@ func SetPodTerminationReason(ctx context.Context, c client.Client, pod *v1.Pod, // MarkPodsNotReady updates ready status of given pods running on // given node from master return true if success -func MarkPodsNotReady(ctx context.Context, c client.Client, recorder record.EventRecorder, pods []*v1.Pod, nodeName string) error { +func MarkPodsNotReady(ctx context.Context, c client.Client, recorder record.EventRecorder, pods []*corev1.Pod, nodeName string) error { klog.V(2).InfoS("Update ready status of pods on node", "node", klog.KRef("", nodeName)) errs := []error{} @@ -153,11 +153,11 @@ func MarkPodsNotReady(ctx context.Context, c client.Client, recorder record.Even // Pod will be modified, so making copy is required. pod := pods[i].DeepCopy() for _, cond := range pod.Status.Conditions { - if cond.Type != v1.PodReady { + if cond.Type != corev1.PodReady { continue } - cond.Status = v1.ConditionFalse + cond.Status = corev1.ConditionFalse if !utilpod.UpdatePodCondition(&pod.Status, &cond) { break } @@ -174,7 +174,7 @@ func MarkPodsNotReady(ctx context.Context, c client.Client, recorder record.Even errs = append(errs, err) } // record NodeNotReady event after updateStatus to make sure pod still exists - recorder.Event(pod, v1.EventTypeWarning, "NodeNotReady", "Node is not ready") + recorder.Event(pod, corev1.EventTypeWarning, "NodeNotReady", "Node is not ready") break } } @@ -184,7 +184,7 @@ func MarkPodsNotReady(ctx context.Context, c client.Client, recorder record.Even // RecordNodeEvent records a event related to a node. func RecordNodeEvent(ctx context.Context, recorder record.EventRecorder, nodeName, nodeUID, eventtype, reason, event string) { - ref := &v1.ObjectReference{ + ref := &corev1.ObjectReference{ APIVersion: "v1", Kind: "Node", Name: nodeName, @@ -196,8 +196,8 @@ func RecordNodeEvent(ctx context.Context, recorder record.EventRecorder, nodeNam } // RecordNodeStatusChange records a event related to a node status change. (Common to lifecycle and ipam) -func RecordNodeStatusChange(recorder record.EventRecorder, node *v1.Node, newStatus string) { - ref := &v1.ObjectReference{ +func RecordNodeStatusChange(recorder record.EventRecorder, node *corev1.Node, newStatus string) { + ref := &corev1.ObjectReference{ APIVersion: "v1", Kind: "Node", Name: node.Name, @@ -207,12 +207,12 @@ func RecordNodeStatusChange(recorder record.EventRecorder, node *v1.Node, newSta klog.V(2).InfoS("Recording status change event message for node", "status", newStatus, "node", node.Name) // TODO: This requires a transaction, either both node status is updated // and event is recorded or neither should happen, see issue #6055. - recorder.Eventf(ref, v1.EventTypeNormal, newStatus, "Node %s status is now: %s", node.Name, newStatus) + recorder.Eventf(ref, corev1.EventTypeNormal, newStatus, "Node %s status is now: %s", node.Name, newStatus) } // SwapNodeControllerTaint returns true in case of success and false // otherwise. -func SwapNodeControllerTaint(ctx context.Context, kubeClient clientset.Interface, taintsToAdd, taintsToRemove []*v1.Taint, node *v1.Node) bool { +func SwapNodeControllerTaint(ctx context.Context, kubeClient clientset.Interface, taintsToAdd, taintsToRemove []*corev1.Taint, node *corev1.Node) bool { for _, taintToAdd := range taintsToAdd { now := metav1.Now() taintToAdd.TimeAdded = &now @@ -247,7 +247,7 @@ func SwapNodeControllerTaint(ctx context.Context, kubeClient clientset.Interface // AddOrUpdateLabelsOnNode updates the labels on the node and returns true on // success and false on failure. -func AddOrUpdateLabelsOnNode(ctx context.Context, kubeClient clientset.Interface, labelsToUpdate map[string]string, node *v1.Node) bool { +func AddOrUpdateLabelsOnNode(ctx context.Context, kubeClient clientset.Interface, labelsToUpdate map[string]string, node *corev1.Node) bool { if err := addOrUpdateLabelsOnNode(kubeClient, node.Name, labelsToUpdate); err != nil { utilruntime.HandleError( fmt.Errorf( @@ -263,7 +263,7 @@ func AddOrUpdateLabelsOnNode(ctx context.Context, kubeClient clientset.Interface // GetNodeCondition extracts the provided condition from the given status and returns that. // Returns nil and -1 if the condition is not present, and the index of the located condition. -func GetNodeCondition(status *v1.NodeStatus, conditionType v1.NodeConditionType) (int, *v1.NodeCondition) { +func GetNodeCondition(status *corev1.NodeStatus, conditionType corev1.NodeConditionType) (int, *corev1.NodeCondition) { if status == nil { return -1, nil } @@ -277,14 +277,14 @@ func GetNodeCondition(status *v1.NodeStatus, conditionType v1.NodeConditionType) // AddOrUpdateTaintOnNode add taints to the node. If taint was added into node, it'll issue API calls // to update nodes; otherwise, no API calls. Return error if any. -func AddOrUpdateTaintOnNode(ctx context.Context, c clientset.Interface, nodeName string, taints ...*v1.Taint) error { +func AddOrUpdateTaintOnNode(ctx context.Context, c clientset.Interface, nodeName string, taints ...*corev1.Taint) error { if len(taints) == 0 { return nil } firstTry := true return clientretry.RetryOnConflict(UpdateTaintBackoff, func() error { var err error - var oldNode *v1.Node + var oldNode *corev1.Node // First we try getting node from the API server cache, as it's cheaper. If it fails // we get it from etcd to be sure to have fresh data. option := metav1.GetOptions{} @@ -297,7 +297,7 @@ func AddOrUpdateTaintOnNode(ctx context.Context, c clientset.Interface, nodeName return err } - var newNode *v1.Node + var newNode *corev1.Node oldNodeCopy := oldNode updated := false for _, taint := range taints { @@ -320,7 +320,7 @@ func AddOrUpdateTaintOnNode(ctx context.Context, c clientset.Interface, nodeName // won't fail if target taint doesn't exist or has been removed. // If passed a node it'll check if there's anything to be done, if taint is not present it won't issue // any API calls. -func RemoveTaintOffNode(ctx context.Context, c clientset.Interface, nodeName string, node *v1.Node, taints ...*v1.Taint) error { +func RemoveTaintOffNode(ctx context.Context, c clientset.Interface, nodeName string, node *corev1.Node, taints ...*corev1.Taint) error { if len(taints) == 0 { return nil } @@ -341,7 +341,7 @@ func RemoveTaintOffNode(ctx context.Context, c clientset.Interface, nodeName str firstTry := true return clientretry.RetryOnConflict(UpdateTaintBackoff, func() error { var err error - var oldNode *v1.Node + var oldNode *corev1.Node // First we try getting node from the API server cache, as it's cheaper. If it fails // we get it from etcd to be sure to have fresh data. option := metav1.GetOptions{} @@ -354,7 +354,7 @@ func RemoveTaintOffNode(ctx context.Context, c clientset.Interface, nodeName str return err } - var newNode *v1.Node + var newNode *corev1.Node oldNodeCopy := oldNode updated := false for _, taint := range taints { @@ -374,7 +374,7 @@ func RemoveTaintOffNode(ctx context.Context, c clientset.Interface, nodeName str } // PatchNodeTaints patches node's taints. -func PatchNodeTaints(ctx context.Context, c clientset.Interface, nodeName string, oldNode *v1.Node, newNode *v1.Node) error { +func PatchNodeTaints(ctx context.Context, c clientset.Interface, nodeName string, oldNode *corev1.Node, newNode *corev1.Node) error { // Strip base diff node from RV to ensure that our Patch request will set RV to check for conflicts over .spec.taints. // This is needed because .spec.taints does not specify patchMergeKey and patchStrategy and adding them is no longer an option for compatibility reasons. // Using other Patch strategy works for adding new taints, however will not resolve problem with taint removal. @@ -393,7 +393,7 @@ func PatchNodeTaints(ctx context.Context, c clientset.Interface, nodeName string return fmt.Errorf("could not marshal new node %#v for node %q: %v", newNodeClone, nodeName, err) } - patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldDataNoRV, newData, v1.Node{}) + patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldDataNoRV, newData, corev1.Node{}) if err != nil { return fmt.Errorf("could not create patch for node %q: %v", nodeName, err) } @@ -406,7 +406,7 @@ func addOrUpdateLabelsOnNode(kubeClient clientset.Interface, nodeName string, la firstTry := true return clientretry.RetryOnConflict(UpdateLabelBackoff, func() error { var err error - var node *v1.Node + var node *corev1.Node // First we try getting node from the API server cache, as it's cheaper. If it fails // we get it from etcd to be sure to have fresh data. option := metav1.GetOptions{} @@ -436,7 +436,7 @@ func addOrUpdateLabelsOnNode(kubeClient clientset.Interface, nodeName string, la if err != nil { return fmt.Errorf("could not marshal the new node %#v: %v", newNode, err) } - patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, &v1.Node{}) + patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, &corev1.Node{}) if err != nil { return fmt.Errorf("could not create a two-way merge patch: %v", err) } @@ -453,7 +453,7 @@ func addOrUpdateLabelsOnNode(kubeClient clientset.Interface, nodeName string, la // - apps.openyurt.io/binding: "true" // - openyurt.beta.io/autonomy: "true" // - openyurt.io/autonomy-duration: "duration" -func IsPodBoundenToNode(node *v1.Node) bool { +func IsPodBoundenToNode(node *corev1.Node) bool { if node.Annotations == nil { return false } diff --git a/test/e2e/autonomy/autonomy.go b/test/e2e/autonomy/autonomy.go index aee2e0bf5d8..0baf0128f15 100644 --- a/test/e2e/autonomy/autonomy.go +++ b/test/e2e/autonomy/autonomy.go @@ -150,6 +150,7 @@ var _ = ginkgo.Describe("edge-autonomy"+constants.YurtE2ENamespaceName, ginkgo.O gomega.Eventually(func() string { opBytes, err := exec.Command("/bin/bash", "-c", "docker exec -t openyurt-e2e-test-worker /bin/bash -c 'curl -m 2 "+NginxServiceIP+"'").CombinedOutput() if err != nil { + klog.Errorf("failed to curl nginx service cluster ip %v", err) return "" } return string(opBytes)