Skip to content

Commit

Permalink
deduplicate update event in the service entry store (#568)
Browse files Browse the repository at this point in the history
Signed-off-by: spacewander <[email protected]>
  • Loading branch information
spacewander authored Jun 6, 2024
1 parent d79efc5 commit 877d43e
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 2 deletions.
15 changes: 13 additions & 2 deletions controller/internal/registry/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"sync"

"google.golang.org/protobuf/proto"
istioapi "istio.io/api/networking/v1alpha3"

"mosn.io/htnn/controller/internal/log"
Expand Down Expand Up @@ -45,8 +46,18 @@ func (store *serviceEntryStore) Update(service string, se *pkgRegistry.ServiceEn
store.lock.Lock()
defer store.lock.Unlock()

log.Infof("service entry stores update service: %s, entry: %v", service, &se.ServiceEntry)
log.Infof("service entry store updates service: %s, entry: %v", service, &se.ServiceEntry)
ctx := context.Background()

if prev, ok := store.entries[service]; ok {
// Some registry SDKs may send the same service entry multiple times. For example, at least in
// nacos-sdk-go 1.1.4, when the service is first subscribed, the SDK will run the callback
// twice. Here we decide to deduplicate in the store.
if proto.Equal(&se.ServiceEntry, prev) {
log.Infof("service %s not changed in service entry store, ignored", service)
return
}
}
store.entries[service] = &se.ServiceEntry

store.output.FromServiceRegistry(ctx, store.entries)
Expand All @@ -60,7 +71,7 @@ func (store *serviceEntryStore) Delete(service string) {
return
}

log.Infof("service entry stores delete service: %s", service)
log.Infof("service entry store deletes service: %s", service)
delete(store.entries, service)
store.output.FromServiceRegistry(context.Background(), store.entries)
}
54 changes: 54 additions & 0 deletions controller/internal/registry/store_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// Copyright The HTNN Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package registry

import (
"testing"

"github.com/agiledragon/gomonkey/v2"
"github.com/stretchr/testify/require"
istioapi "istio.io/api/networking/v1alpha3"

"mosn.io/htnn/controller/internal/controller/component"
pkgRegistry "mosn.io/htnn/controller/pkg/registry"
"mosn.io/htnn/controller/tests/pkg"
)

func TestStoreRepeatedUpdate(t *testing.T) {
client := pkg.FakeK8sClient(t)
out := component.NewK8sOutput(client)
counter := 0

patches := gomonkey.ApplyMethodFunc(out, "FromServiceRegistry", func(ctx interface{}, serviceEntries map[string]*istioapi.ServiceEntry) {
counter++
})
defer patches.Reset()

store := newServiceEntryStore(out)
sew := &pkgRegistry.ServiceEntryWrapper{
ServiceEntry: istioapi.ServiceEntry{
Hosts: []string{"test.default-group.public.earth.nacos"},
},
}
store.Update("test", sew)
sew2 := &pkgRegistry.ServiceEntryWrapper{
ServiceEntry: istioapi.ServiceEntry{
Hosts: []string{"test.default-group.public.earth.nacos"},
},
}
store.Update("test", sew2)

require.Equal(t, 1, counter)
}

0 comments on commit 877d43e

Please sign in to comment.