From 877d43e3a10de76cbd2484ac0183efe9720dc065 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=BD=97=E6=B3=BD=E8=BD=A9?= Date: Thu, 6 Jun 2024 12:40:47 +0800 Subject: [PATCH] deduplicate update event in the service entry store (#568) Signed-off-by: spacewander --- controller/internal/registry/store.go | 15 +++++- controller/internal/registry/store_test.go | 54 ++++++++++++++++++++++ 2 files changed, 67 insertions(+), 2 deletions(-) create mode 100644 controller/internal/registry/store_test.go diff --git a/controller/internal/registry/store.go b/controller/internal/registry/store.go index 8a1aecdf..d5466589 100644 --- a/controller/internal/registry/store.go +++ b/controller/internal/registry/store.go @@ -18,6 +18,7 @@ import ( "context" "sync" + "google.golang.org/protobuf/proto" istioapi "istio.io/api/networking/v1alpha3" "mosn.io/htnn/controller/internal/log" @@ -45,8 +46,18 @@ func (store *serviceEntryStore) Update(service string, se *pkgRegistry.ServiceEn store.lock.Lock() defer store.lock.Unlock() - log.Infof("service entry stores update service: %s, entry: %v", service, &se.ServiceEntry) + log.Infof("service entry store updates service: %s, entry: %v", service, &se.ServiceEntry) ctx := context.Background() + + if prev, ok := store.entries[service]; ok { + // Some registry SDKs may send the same service entry multiple times. For example, at least in + // nacos-sdk-go 1.1.4, when the service is first subscribed, the SDK will run the callback + // twice. Here we decide to deduplicate in the store. + if proto.Equal(&se.ServiceEntry, prev) { + log.Infof("service %s not changed in service entry store, ignored", service) + return + } + } store.entries[service] = &se.ServiceEntry store.output.FromServiceRegistry(ctx, store.entries) @@ -60,7 +71,7 @@ func (store *serviceEntryStore) Delete(service string) { return } - log.Infof("service entry stores delete service: %s", service) + log.Infof("service entry store deletes service: %s", service) delete(store.entries, service) store.output.FromServiceRegistry(context.Background(), store.entries) } diff --git a/controller/internal/registry/store_test.go b/controller/internal/registry/store_test.go new file mode 100644 index 00000000..f5004bb1 --- /dev/null +++ b/controller/internal/registry/store_test.go @@ -0,0 +1,54 @@ +// Copyright The HTNN Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package registry + +import ( + "testing" + + "github.com/agiledragon/gomonkey/v2" + "github.com/stretchr/testify/require" + istioapi "istio.io/api/networking/v1alpha3" + + "mosn.io/htnn/controller/internal/controller/component" + pkgRegistry "mosn.io/htnn/controller/pkg/registry" + "mosn.io/htnn/controller/tests/pkg" +) + +func TestStoreRepeatedUpdate(t *testing.T) { + client := pkg.FakeK8sClient(t) + out := component.NewK8sOutput(client) + counter := 0 + + patches := gomonkey.ApplyMethodFunc(out, "FromServiceRegistry", func(ctx interface{}, serviceEntries map[string]*istioapi.ServiceEntry) { + counter++ + }) + defer patches.Reset() + + store := newServiceEntryStore(out) + sew := &pkgRegistry.ServiceEntryWrapper{ + ServiceEntry: istioapi.ServiceEntry{ + Hosts: []string{"test.default-group.public.earth.nacos"}, + }, + } + store.Update("test", sew) + sew2 := &pkgRegistry.ServiceEntryWrapper{ + ServiceEntry: istioapi.ServiceEntry{ + Hosts: []string{"test.default-group.public.earth.nacos"}, + }, + } + store.Update("test", sew2) + + require.Equal(t, 1, counter) +}