Skip to content

Commit

Permalink
Implement multi plexer cache.
Browse files Browse the repository at this point in the history
  • Loading branch information
zyjhtangtang committed Nov 12, 2024
1 parent 3f1f88b commit 7ba1591
Show file tree
Hide file tree
Showing 13 changed files with 1,255 additions and 10 deletions.
16 changes: 8 additions & 8 deletions pkg/yurthub/filter/servicetopology/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
"context"

v1 "k8s.io/api/core/v1"
discovery "k8s.io/api/discovery/v1"
discoveryv1 "k8s.io/api/discovery/v1"
discoveryV1beta1 "k8s.io/api/discovery/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
Expand Down Expand Up @@ -130,7 +130,7 @@ func (stf *serviceTopologyFilter) Filter(obj runtime.Object, stopCh <-chan struc
}

switch v := obj.(type) {
case *v1.Endpoints, *discoveryV1beta1.EndpointSlice, *discovery.EndpointSlice:
case *v1.Endpoints, *discoveryV1beta1.EndpointSlice, *discoveryv1.EndpointSlice:
return stf.serviceTopologyHandler(v)
default:
return obj
Expand Down Expand Up @@ -164,9 +164,9 @@ func (stf *serviceTopologyFilter) resolveServiceTopologyType(obj runtime.Object)
case *discoveryV1beta1.EndpointSlice:
svcNamespace = v.Namespace
svcName = v.Labels[discoveryV1beta1.LabelServiceName]
case *discovery.EndpointSlice:
case *discoveryv1.EndpointSlice:
svcNamespace = v.Namespace
svcName = v.Labels[discovery.LabelServiceName]
svcName = v.Labels[discoveryv1.LabelServiceName]
case *v1.Endpoints:
svcNamespace = v.Namespace
svcName = v.Name
Expand All @@ -190,7 +190,7 @@ func (stf *serviceTopologyFilter) nodeTopologyHandler(obj runtime.Object) runtim
switch v := obj.(type) {
case *discoveryV1beta1.EndpointSlice:
return reassembleV1beta1EndpointSlice(v, stf.nodeName, nil)
case *discovery.EndpointSlice:
case *discoveryv1.EndpointSlice:
return reassembleEndpointSlice(v, stf.nodeName, nil)
case *v1.Endpoints:
return reassembleEndpoints(v, stf.nodeName, nil)
Expand All @@ -215,7 +215,7 @@ func (stf *serviceTopologyFilter) nodePoolTopologyHandler(obj runtime.Object) ru
switch v := obj.(type) {
case *discoveryV1beta1.EndpointSlice:
return reassembleV1beta1EndpointSlice(v, "", nodes)
case *discovery.EndpointSlice:
case *discoveryv1.EndpointSlice:
return reassembleEndpointSlice(v, "", nodes)
case *v1.Endpoints:
return reassembleEndpoints(v, "", nodes)
Expand Down Expand Up @@ -252,13 +252,13 @@ func reassembleV1beta1EndpointSlice(endpointSlice *discoveryV1beta1.EndpointSlic
}

// reassembleEndpointSlice will discard endpoints that are not on the same node/nodePool for v1.EndpointSlice
func reassembleEndpointSlice(endpointSlice *discovery.EndpointSlice, nodeName string, nodes []string) *discovery.EndpointSlice {
func reassembleEndpointSlice(endpointSlice *discoveryv1.EndpointSlice, nodeName string, nodes []string) *discoveryv1.EndpointSlice {
if len(nodeName) != 0 && len(nodes) != 0 {
klog.Warningf("reassembleEndpointSlice: nodeName(%s) and nodePool can not be set at the same time", nodeName)
return endpointSlice
}

var newEps []discovery.Endpoint
var newEps []discoveryv1.Endpoint
for i := range endpointSlice.Endpoints {
if len(nodeName) != 0 {
if *endpointSlice.Endpoints[i].NodeName == nodeName {
Expand Down
74 changes: 74 additions & 0 deletions pkg/yurthub/multiplexer/cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
Copyright 2024 The OpenYurt Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package multiplexer

import (
"context"
"fmt"
"sync"

"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/watch"
kstorage "k8s.io/apiserver/pkg/storage"
"k8s.io/apiserver/pkg/storage/cacher"
"k8s.io/client-go/kubernetes/scheme"
)

type Interface interface {
Watch(ctx context.Context, key string, opts kstorage.ListOptions) (watch.Interface, error)
Get(ctx context.Context, key string, opts kstorage.GetOptions, objPtr runtime.Object) error
GetList(ctx context.Context, key string, opts kstorage.ListOptions, listObj runtime.Object) error
}

type ResourceCacheConfig struct {
KeyFunc func(runtime.Object) (string, error)
NewFunc func() runtime.Object
NewListFunc func() runtime.Object
GetAttrsFunc kstorage.AttrFunc
}

func NewResourceCache(
s kstorage.Interface,
resource *schema.GroupVersionResource,
config *ResourceCacheConfig) (Interface, func(), error) {

cacheConfig := cacher.Config{
Storage: s,
Versioner: kstorage.APIObjectVersioner{},
GroupResource: resource.GroupResource(),
KeyFunc: config.KeyFunc,
NewFunc: config.NewFunc,
NewListFunc: config.NewListFunc,
GetAttrsFunc: config.GetAttrsFunc,
Codec: scheme.Codecs.LegacyCodec(resource.GroupVersion()),
}

cacher, err := cacher.NewCacherFromConfig(cacheConfig)
if err != nil {
return nil, func() {}, fmt.Errorf("failed to new cacher from config, error: %v", err)
}

var once sync.Once
destroyFunc := func() {
once.Do(func() {
cacher.Stop()
})
}

return cacher, destroyFunc, nil
}
156 changes: 156 additions & 0 deletions pkg/yurthub/multiplexer/cache_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
/*
Copyright 2024 The OpenYurt Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package multiplexer

import (
"context"
"testing"

"github.com/stretchr/testify/assert"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/apiserver/pkg/storage"

ystorage "github.com/openyurtio/openyurt/pkg/yurthub/multiplexer/storage"
)

var serviceGVR = &schema.GroupVersionResource{
Group: "",
Version: "v1",
Resource: "services",
}

var newServiceFunc = func() runtime.Object {
return &v1.Service{}
}

var newServiceListFunc = func() runtime.Object {
return &v1.ServiceList{}
}

func TestResourceCache_GetList(t *testing.T) {
cache, _, err := NewResourceCache(
ystorage.NewFakeServiceStorage([]v1.Service{*newService(metav1.NamespaceSystem, "coredns")}),
serviceGVR,
&ResourceCacheConfig{
keyFunc,
newServiceFunc,
newServiceListFunc,
attrsFunc,
},
)

assert.Nil(t, err)
assertCacheGetList(t, cache)
}

func mockListOptions() storage.ListOptions {
return storage.ListOptions{
ResourceVersion: "100",
Recursive: true,
Predicate: storage.SelectionPredicate{
Label: labels.Everything(),
Field: fields.Everything(),
},
}
}

func assertCacheGetList(t testing.TB, cache Interface) {
t.Helper()

serviceList := &v1.ServiceList{}
err := cache.GetList(context.Background(), "", mockListOptions(), serviceList)

assert.Nil(t, err)
assert.Equal(t, 1, len(serviceList.Items))
}

func TestResourceCache_Watch(t *testing.T) {
fakeStorage := ystorage.NewFakeServiceStorage([]v1.Service{*newService(metav1.NamespaceSystem, "coredns")})

cache, _, err := NewResourceCache(
fakeStorage,
serviceGVR,
&ResourceCacheConfig{
keyFunc,
newServiceFunc,
newServiceListFunc,
attrsFunc,
},
)

assert.Nil(t, err)
assertCacheWatch(t, cache, fakeStorage)
}

func mockWatchOptions() storage.ListOptions {
var sendInitialEvents = true

return storage.ListOptions{
ResourceVersion: "100",
Predicate: storage.SelectionPredicate{
Label: labels.Everything(),
Field: fields.Everything(),
},
Recursive: true,
SendInitialEvents: &sendInitialEvents,
}
}

func assertCacheWatch(t testing.TB, cache Interface, fs *ystorage.FakeServiceStorage) {
receive, err := cache.Watch(context.TODO(), "", mockWatchOptions())

go func() {
fs.AddWatchObject(newService(metav1.NamespaceSystem, "coredns2"))
}()

assert.Nil(t, err)
event := <-receive.ResultChan()
assert.Equal(t, watch.Added, event.Type)
}

func TestResourceCache_Get(t *testing.T) {
cache, _, err := NewResourceCache(
ystorage.NewFakeServiceStorage([]v1.Service{*newService(metav1.NamespaceSystem, "coredns")}),
serviceGVR,
&ResourceCacheConfig{
keyFunc,
newServiceFunc,
newServiceListFunc,
attrsFunc,
},
)
assert.Nil(t, err)
assertCacheGet(t, cache)
}

func assertCacheGet(t testing.TB, cache Interface) {
t.Helper()

service := &v1.Service{}
err := cache.Get(context.Background(), "/kube-system/coredns", storage.GetOptions{
ResourceVersion: "1",
}, service)

assert.Nil(t, err)
assert.Equal(t, "coredns", service.Name)
}
Loading

0 comments on commit 7ba1591

Please sign in to comment.