Skip to content

Commit

Permalink
fix: ignore correctly bootstrapped duplicates in WatchKind
Browse files Browse the repository at this point in the history
Previous implementation had a bug when it might ignore an event coming
for a resource which was deleted and then recreated under the same
resource version.

Co-authored-by: Utku Ozdemir <[email protected]>
Signed-off-by: Andrey Smirnov <[email protected]>
  • Loading branch information
smira and utkuozdemir committed Jan 25, 2023
1 parent bfd872f commit a81a529
Show file tree
Hide file tree
Showing 4 changed files with 126 additions and 23 deletions.
17 changes: 12 additions & 5 deletions pkg/state/impl/etcd/controller_runtime_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,21 +13,28 @@ import (
"github.com/cosi-project/runtime/pkg/resource/protobuf"
"github.com/cosi-project/runtime/pkg/state"
"github.com/cosi-project/runtime/pkg/state/impl/store"
"github.com/stretchr/testify/require"
suiterunner "github.com/stretchr/testify/suite"
clientv3 "go.etcd.io/etcd/client/v3"

"github.com/cosi-project/state-etcd/pkg/state/impl/etcd"
"github.com/cosi-project/state-etcd/pkg/util/testhelpers"
)

func must(err error) {
if err != nil {
panic(err)
}
}

func init() {
must(protobuf.RegisterResource(conformance.IntResourceType, &conformance.IntResource{}))
must(protobuf.RegisterResource(conformance.StrResourceType, &conformance.StrResource{}))
must(protobuf.RegisterResource(conformance.SentenceResourceType, &conformance.SentenceResource{}))
}

func TestRuntimeConformance(t *testing.T) {
t.Parallel()

require.NoError(t, protobuf.RegisterResource(conformance.IntResourceType, &conformance.IntResource{}))
require.NoError(t, protobuf.RegisterResource(conformance.StrResourceType, &conformance.StrResource{}))
require.NoError(t, protobuf.RegisterResource(conformance.SentenceResourceType, &conformance.SentenceResource{}))

testhelpers.WithEtcd(t, func(cli *clientv3.Client) {
suite := &conformance.RuntimeSuite{}
suite.SetupRuntime = func() {
Expand Down
19 changes: 6 additions & 13 deletions pkg/state/impl/etcd/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"fmt"
"sort"
"strconv"
"strings"
"time"

"github.com/cosi-project/runtime/pkg/resource"
Expand Down Expand Up @@ -113,7 +112,7 @@ func (st *State) List(ctx context.Context, resourceKind resource.Kind, opts ...s
}

sort.Slice(resources, func(i, j int) bool {
return strings.Compare(resources[i].Metadata().String(), resources[j].Metadata().String()) < 0
return resources[i].Metadata().ID() < resources[j].Metadata().ID()
})

return resource.List{
Expand Down Expand Up @@ -507,8 +506,6 @@ func (st *State) WatchKind(ctx context.Context, resourceKind resource.Kind, ch c
go func() {
defer cancel()

sentBootstrapEventSet := make(map[string]struct{}, len(bootstrapList))

// send initial contents if they were captured
for _, res := range bootstrapList {
if !channel.SendWithContext(ctx, ch,
Expand All @@ -519,8 +516,6 @@ func (st *State) WatchKind(ctx context.Context, resourceKind resource.Kind, ch c
) {
return
}

sentBootstrapEventSet[res.Metadata().String()] = struct{}{}
}

bootstrapList = nil
Expand Down Expand Up @@ -567,6 +562,11 @@ func (st *State) WatchKind(ctx context.Context, resourceKind resource.Kind, ch c
}

for _, etcdEvent := range watchResponse.Events {
// watch event might come for a revision which was already sent in the bootstrapped set, ignore it
if etcdEvent.Kv != nil && etcdEvent.Kv.ModRevision <= revision {
continue
}

event, err := st.convertEvent(etcdEvent)
if err != nil {
channel.SendWithContext(ctx, ch,
Expand Down Expand Up @@ -607,13 +607,6 @@ func (st *State) WatchKind(ctx context.Context, resourceKind resource.Kind, ch c
panic("should never be reached")
}

if !(event.Type == state.Destroyed) {
_, alreadySent := sentBootstrapEventSet[event.Resource.Metadata().String()]
if alreadySent {
continue
}
}

if !channel.SendWithContext(ctx, ch, event) {
return
}
Expand Down
6 changes: 1 addition & 5 deletions pkg/state/impl/etcd/etcd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ package etcd_test

import (
"context"
"log"
"testing"
"time"

Expand All @@ -23,10 +22,7 @@ import (
)

func init() {
err := protobuf.RegisterResource(conformance.PathResourceType, &conformance.PathResource{})
if err != nil {
log.Fatalf("failed to register resource: %v", err)
}
must(protobuf.RegisterResource(conformance.PathResourceType, &conformance.PathResource{}))
}

func TestPreserveCreated(t *testing.T) {
Expand Down
107 changes: 107 additions & 0 deletions pkg/state/impl/etcd/watch_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.

package etcd_test

import (
"context"
"fmt"
"testing"
"time"

"github.com/cosi-project/runtime/pkg/safe"
"github.com/cosi-project/runtime/pkg/state"
"github.com/cosi-project/runtime/pkg/state/conformance"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestWatchKindWithBootstrap(t *testing.T) {
t.Parallel()

for _, test := range []struct {
name string
destroyIsTheLastOperation bool
}{
{
name: "put is last",
destroyIsTheLastOperation: false,
},
{
name: "delete is last",
destroyIsTheLastOperation: true,
},
} {
test := test

t.Run(test.name, func(t *testing.T) {
t.Parallel()

withEtcd(t, func(s state.State) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

for i := 0; i < 3; i++ {
require.NoError(t, s.Create(ctx, conformance.NewPathResource("default", fmt.Sprintf("path-%d", i))))
}

if test.destroyIsTheLastOperation {
require.NoError(t, s.Create(ctx, conformance.NewPathResource("default", "path-3")))
require.NoError(t, s.Destroy(ctx, conformance.NewPathResource("default", "path-3").Metadata()))
}

watchCh := make(chan state.Event)

require.NoError(t, s.WatchKind(ctx, conformance.NewPathResource("default", "").Metadata(), watchCh, state.WithBootstrapContents(true)))

for i := 0; i < 3; i++ {
select {
case <-time.After(time.Second):
t.Fatal("timeout waiting for event")
case ev := <-watchCh:
assert.Equal(t, state.Created, ev.Type)
assert.Equal(t, fmt.Sprintf("path-%d", i), ev.Resource.Metadata().ID())
assert.IsType(t, &conformance.PathResource{}, ev.Resource)
}
}

select {
case <-time.After(time.Second):
t.Fatal("timeout waiting for event")
case ev := <-watchCh:
assert.Equal(t, state.Bootstrapped, ev.Type)
}

require.NoError(t, s.Destroy(ctx, conformance.NewPathResource("default", "path-0").Metadata()))

select {
case <-time.After(time.Second):
t.Fatal("timeout waiting for event")
case ev := <-watchCh:
assert.Equal(t, state.Destroyed, ev.Type, "event %s %s", ev.Type, ev.Resource)
assert.Equal(t, "path-0", ev.Resource.Metadata().ID())
assert.IsType(t, &conformance.PathResource{}, ev.Resource)
}

newR, err := safe.StateUpdateWithConflicts(ctx, s, conformance.NewPathResource("default", "path-1").Metadata(), func(r *conformance.PathResource) error {
r.Metadata().Finalizers().Add("foo")

return nil
})
require.NoError(t, err)

select {
case <-time.After(time.Second):
t.Fatal("timeout waiting for event")
case ev := <-watchCh:
assert.Equal(t, state.Updated, ev.Type, "event %s %s", ev.Type, ev.Resource)
assert.Equal(t, "path-1", ev.Resource.Metadata().ID())
assert.Equal(t, newR.Metadata().Finalizers(), ev.Resource.Metadata().Finalizers())
assert.Equal(t, newR.Metadata().Version(), ev.Resource.Metadata().Version())
assert.IsType(t, &conformance.PathResource{}, ev.Resource)
}
})
})
}
}

0 comments on commit a81a529

Please sign in to comment.