From a81a52918c0ecb64ae76448c25fee755a2376b23 Mon Sep 17 00:00:00 2001 From: Andrey Smirnov Date: Wed, 25 Jan 2023 17:24:31 +0400 Subject: [PATCH] fix: ignore correctly bootstrapped duplicates in WatchKind Previous implementation had a bug when it might ignore an event coming for a resource which was deleted and then recreated under the same resource version. Co-authored-by: Utku Ozdemir Signed-off-by: Andrey Smirnov --- .../impl/etcd/controller_runtime_test.go | 17 ++- pkg/state/impl/etcd/etcd.go | 19 +--- pkg/state/impl/etcd/etcd_test.go | 6 +- pkg/state/impl/etcd/watch_test.go | 107 ++++++++++++++++++ 4 files changed, 126 insertions(+), 23 deletions(-) create mode 100644 pkg/state/impl/etcd/watch_test.go diff --git a/pkg/state/impl/etcd/controller_runtime_test.go b/pkg/state/impl/etcd/controller_runtime_test.go index 574971d..59ae7b4 100644 --- a/pkg/state/impl/etcd/controller_runtime_test.go +++ b/pkg/state/impl/etcd/controller_runtime_test.go @@ -13,7 +13,6 @@ import ( "github.com/cosi-project/runtime/pkg/resource/protobuf" "github.com/cosi-project/runtime/pkg/state" "github.com/cosi-project/runtime/pkg/state/impl/store" - "github.com/stretchr/testify/require" suiterunner "github.com/stretchr/testify/suite" clientv3 "go.etcd.io/etcd/client/v3" @@ -21,13 +20,21 @@ import ( "github.com/cosi-project/state-etcd/pkg/util/testhelpers" ) +func must(err error) { + if err != nil { + panic(err) + } +} + +func init() { + must(protobuf.RegisterResource(conformance.IntResourceType, &conformance.IntResource{})) + must(protobuf.RegisterResource(conformance.StrResourceType, &conformance.StrResource{})) + must(protobuf.RegisterResource(conformance.SentenceResourceType, &conformance.SentenceResource{})) +} + func TestRuntimeConformance(t *testing.T) { t.Parallel() - require.NoError(t, protobuf.RegisterResource(conformance.IntResourceType, &conformance.IntResource{})) - require.NoError(t, protobuf.RegisterResource(conformance.StrResourceType, &conformance.StrResource{})) - require.NoError(t, protobuf.RegisterResource(conformance.SentenceResourceType, &conformance.SentenceResource{})) - testhelpers.WithEtcd(t, func(cli *clientv3.Client) { suite := &conformance.RuntimeSuite{} suite.SetupRuntime = func() { diff --git a/pkg/state/impl/etcd/etcd.go b/pkg/state/impl/etcd/etcd.go index 89633fe..745017f 100644 --- a/pkg/state/impl/etcd/etcd.go +++ b/pkg/state/impl/etcd/etcd.go @@ -10,7 +10,6 @@ import ( "fmt" "sort" "strconv" - "strings" "time" "github.com/cosi-project/runtime/pkg/resource" @@ -113,7 +112,7 @@ func (st *State) List(ctx context.Context, resourceKind resource.Kind, opts ...s } sort.Slice(resources, func(i, j int) bool { - return strings.Compare(resources[i].Metadata().String(), resources[j].Metadata().String()) < 0 + return resources[i].Metadata().ID() < resources[j].Metadata().ID() }) return resource.List{ @@ -507,8 +506,6 @@ func (st *State) WatchKind(ctx context.Context, resourceKind resource.Kind, ch c go func() { defer cancel() - sentBootstrapEventSet := make(map[string]struct{}, len(bootstrapList)) - // send initial contents if they were captured for _, res := range bootstrapList { if !channel.SendWithContext(ctx, ch, @@ -519,8 +516,6 @@ func (st *State) WatchKind(ctx context.Context, resourceKind resource.Kind, ch c ) { return } - - sentBootstrapEventSet[res.Metadata().String()] = struct{}{} } bootstrapList = nil @@ -567,6 +562,11 @@ func (st *State) WatchKind(ctx context.Context, resourceKind resource.Kind, ch c } for _, etcdEvent := range watchResponse.Events { + // watch event might come for a revision which was already sent in the bootstrapped set, ignore it + if etcdEvent.Kv != nil && etcdEvent.Kv.ModRevision <= revision { + continue + } + event, err := st.convertEvent(etcdEvent) if err != nil { channel.SendWithContext(ctx, ch, @@ -607,13 +607,6 @@ func (st *State) WatchKind(ctx context.Context, resourceKind resource.Kind, ch c panic("should never be reached") } - if !(event.Type == state.Destroyed) { - _, alreadySent := sentBootstrapEventSet[event.Resource.Metadata().String()] - if alreadySent { - continue - } - } - if !channel.SendWithContext(ctx, ch, event) { return } diff --git a/pkg/state/impl/etcd/etcd_test.go b/pkg/state/impl/etcd/etcd_test.go index e604c78..5eab016 100644 --- a/pkg/state/impl/etcd/etcd_test.go +++ b/pkg/state/impl/etcd/etcd_test.go @@ -6,7 +6,6 @@ package etcd_test import ( "context" - "log" "testing" "time" @@ -23,10 +22,7 @@ import ( ) func init() { - err := protobuf.RegisterResource(conformance.PathResourceType, &conformance.PathResource{}) - if err != nil { - log.Fatalf("failed to register resource: %v", err) - } + must(protobuf.RegisterResource(conformance.PathResourceType, &conformance.PathResource{})) } func TestPreserveCreated(t *testing.T) { diff --git a/pkg/state/impl/etcd/watch_test.go b/pkg/state/impl/etcd/watch_test.go new file mode 100644 index 0000000..6d00370 --- /dev/null +++ b/pkg/state/impl/etcd/watch_test.go @@ -0,0 +1,107 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +package etcd_test + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/cosi-project/runtime/pkg/safe" + "github.com/cosi-project/runtime/pkg/state" + "github.com/cosi-project/runtime/pkg/state/conformance" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestWatchKindWithBootstrap(t *testing.T) { + t.Parallel() + + for _, test := range []struct { + name string + destroyIsTheLastOperation bool + }{ + { + name: "put is last", + destroyIsTheLastOperation: false, + }, + { + name: "delete is last", + destroyIsTheLastOperation: true, + }, + } { + test := test + + t.Run(test.name, func(t *testing.T) { + t.Parallel() + + withEtcd(t, func(s state.State) { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + for i := 0; i < 3; i++ { + require.NoError(t, s.Create(ctx, conformance.NewPathResource("default", fmt.Sprintf("path-%d", i)))) + } + + if test.destroyIsTheLastOperation { + require.NoError(t, s.Create(ctx, conformance.NewPathResource("default", "path-3"))) + require.NoError(t, s.Destroy(ctx, conformance.NewPathResource("default", "path-3").Metadata())) + } + + watchCh := make(chan state.Event) + + require.NoError(t, s.WatchKind(ctx, conformance.NewPathResource("default", "").Metadata(), watchCh, state.WithBootstrapContents(true))) + + for i := 0; i < 3; i++ { + select { + case <-time.After(time.Second): + t.Fatal("timeout waiting for event") + case ev := <-watchCh: + assert.Equal(t, state.Created, ev.Type) + assert.Equal(t, fmt.Sprintf("path-%d", i), ev.Resource.Metadata().ID()) + assert.IsType(t, &conformance.PathResource{}, ev.Resource) + } + } + + select { + case <-time.After(time.Second): + t.Fatal("timeout waiting for event") + case ev := <-watchCh: + assert.Equal(t, state.Bootstrapped, ev.Type) + } + + require.NoError(t, s.Destroy(ctx, conformance.NewPathResource("default", "path-0").Metadata())) + + select { + case <-time.After(time.Second): + t.Fatal("timeout waiting for event") + case ev := <-watchCh: + assert.Equal(t, state.Destroyed, ev.Type, "event %s %s", ev.Type, ev.Resource) + assert.Equal(t, "path-0", ev.Resource.Metadata().ID()) + assert.IsType(t, &conformance.PathResource{}, ev.Resource) + } + + newR, err := safe.StateUpdateWithConflicts(ctx, s, conformance.NewPathResource("default", "path-1").Metadata(), func(r *conformance.PathResource) error { + r.Metadata().Finalizers().Add("foo") + + return nil + }) + require.NoError(t, err) + + select { + case <-time.After(time.Second): + t.Fatal("timeout waiting for event") + case ev := <-watchCh: + assert.Equal(t, state.Updated, ev.Type, "event %s %s", ev.Type, ev.Resource) + assert.Equal(t, "path-1", ev.Resource.Metadata().ID()) + assert.Equal(t, newR.Metadata().Finalizers(), ev.Resource.Metadata().Finalizers()) + assert.Equal(t, newR.Metadata().Version(), ev.Resource.Metadata().Version()) + assert.IsType(t, &conformance.PathResource{}, ev.Resource) + } + }) + }) + } +}