Skip to content

Commit

Permalink
feat: support invalid watch bookmark concept
Browse files Browse the repository at this point in the history
Catch up with cosi-project/runtime#512.

Signed-off-by: Andrey Smirnov <[email protected]>
  • Loading branch information
smira committed Dec 13, 2024
1 parent 36d3cda commit 7462e97
Show file tree
Hide file tree
Showing 7 changed files with 134 additions and 31 deletions.
18 changes: 9 additions & 9 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,16 @@ module github.com/cosi-project/state-etcd
go 1.23.0

require (
github.com/cosi-project/runtime v0.7.4
github.com/cosi-project/runtime v0.7.6
github.com/siderolabs/gen v0.7.0
github.com/stretchr/testify v1.10.0
go.etcd.io/etcd/api/v3 v3.5.17
go.etcd.io/etcd/client/v3 v3.5.17
go.etcd.io/etcd/server/v3 v3.5.17
go.uber.org/goleak v1.3.0
go.uber.org/zap v1.27.0
golang.org/x/sync v0.9.0
google.golang.org/grpc v1.68.0
golang.org/x/sync v0.10.0
google.golang.org/grpc v1.68.1
)

require (
Expand Down Expand Up @@ -68,14 +68,14 @@ require (
go.opentelemetry.io/otel/trace v1.20.0 // indirect
go.opentelemetry.io/proto/otlp v1.0.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/crypto v0.29.0 // indirect
golang.org/x/net v0.31.0 // indirect
golang.org/x/sys v0.27.0 // indirect
golang.org/x/text v0.20.0 // indirect
golang.org/x/crypto v0.30.0 // indirect
golang.org/x/net v0.32.0 // indirect
golang.org/x/sys v0.28.0 // indirect
golang.org/x/text v0.21.0 // indirect
golang.org/x/time v0.8.0 // indirect
google.golang.org/genproto v0.0.0-20241202173237-19429a94021a // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20241202173237-19429a94021a // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20241202173237-19429a94021a // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20241206012308-a4fef0638583 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20241206012308-a4fef0638583 // indirect
google.golang.org/protobuf v1.35.2 // indirect
gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
Expand Down
36 changes: 18 additions & 18 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ github.com/coreos/go-semver v0.3.1 h1:yi21YpKnrx1gt5R+la8n5WgS0kCrsPp33dmEyHReZr
github.com/coreos/go-semver v0.3.1/go.mod h1:irMmmIw/7yzSRPWryHsK7EYSg09caPQL03VsM8rvUec=
github.com/coreos/go-systemd/v22 v22.5.0 h1:RrqgGjYQKalulkV8NGVIfkXQf6YYmOyiJKk8iXXhfZs=
github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc=
github.com/cosi-project/runtime v0.7.4 h1:DQAXhYQ9dkDHIGHsrCAuJOrTNaIp13gvrOUtJ4kkeys=
github.com/cosi-project/runtime v0.7.4/go.mod h1:9hGWkvz7PORXNzC/gJokapXvR+Fb/Mpl6Ic+05WDRMk=
github.com/cosi-project/runtime v0.7.6 h1:G6w4/g6EXrMakji0fHRDHvs9wltqF9LSDU/33er8gdc=
github.com/cosi-project/runtime v0.7.6/go.mod h1:AmDu/IfE/Q0YYzWRnAkDw2GNuMazpNpN9qyV1IErZdc=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
Expand Down Expand Up @@ -248,8 +248,8 @@ golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnf
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.29.0 h1:L5SG1JTTXupVV3n6sUqMTeWbjAyfPwoda2DLX8J8FrQ=
golang.org/x/crypto v0.29.0/go.mod h1:+F4F4N5hv6v38hfeYwTdx20oUvLLc+QfrE9Ax9HtgRg=
golang.org/x/crypto v0.30.0 h1:RwoQn3GkWiMkzlX562cLB7OxWvjH1L8xutO2WoJcRoY=
golang.org/x/crypto v0.30.0/go.mod h1:kDsLvtWBEx7MV9tJOj9bnXsPbxwJQ6csT/x4KIN4Ssk=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU=
Expand All @@ -272,8 +272,8 @@ golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81R
golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/net v0.0.0-20201202161906-c7110b5ffcbb/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/net v0.0.0-20211123203042-d83791d6bcd9/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.31.0 h1:68CPQngjLL0r2AlUKiSxtQFKvzRVbnzLwMUn5SzcLHo=
golang.org/x/net v0.31.0/go.mod h1:P4fl1q7dY2hnZFxEk4pPSkDHF+QqjitcnDjUQyMM+pM=
golang.org/x/net v0.32.0 h1:ZqPmj8Kzc+Y6e0+skZsuACbx+wzMgo5MQsJh9Qd6aYI=
golang.org/x/net v0.32.0/go.mod h1:CwU0IoeOlnQQWJ6ioyFrfRuomB8GKF6KbYXZVyeXNfs=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
Expand All @@ -286,8 +286,8 @@ golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.9.0 h1:fEo0HyrW1GIgZdpbhCRO0PkJajUS5H9IFUztCgEo2jQ=
golang.org/x/sync v0.9.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sync v0.10.0 h1:3NQrjDixjgGwUOCaF8w2+VYHv0Ve/vGYSbdkTa98gmQ=
golang.org/x/sync v0.10.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
Expand All @@ -307,15 +307,15 @@ golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20211025201205-69cdffdb9359/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.27.0 h1:wBqf8DvsY9Y/2P8gAfPDEYNuS30J4lPHJxXSb/nJZ+s=
golang.org/x/sys v0.27.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.28.0 h1:Fksou7UEQUWlKvIdsqzJmUmCX3cZuD2+P3XyyzwMhlA=
golang.org/x/sys v0.28.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.20.0 h1:gK/Kv2otX8gz+wn7Rmb3vT96ZwuoxnQlY+HlJVj7Qug=
golang.org/x/text v0.20.0/go.mod h1:D4IsuqiFMhST5bX19pQ9ikHC2GsaKyk/oF+pn3ducp4=
golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo=
golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ=
golang.org/x/time v0.8.0 h1:9i3RxcPv3PZnitoVGMPDKZSq1xW1gK1Xy3ArNOGZfEg=
golang.org/x/time v0.8.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
Expand All @@ -339,18 +339,18 @@ google.golang.org/genproto v0.0.0-20200423170343-7949de9c1215/go.mod h1:55QSHmfG
google.golang.org/genproto v0.0.0-20200513103714-09dca8ec2884/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c=
google.golang.org/genproto v0.0.0-20241202173237-19429a94021a h1:4voejwOVTsjw6IMfnGt8IzTQBIw45hP8S0e77UMounA=
google.golang.org/genproto v0.0.0-20241202173237-19429a94021a/go.mod h1:dW27OyXi0Ph+N43jeCWMFC86aTT5VgdeQtOSf0Hehdw=
google.golang.org/genproto/googleapis/api v0.0.0-20241202173237-19429a94021a h1:OAiGFfOiA0v9MRYsSidp3ubZaBnteRUyn3xB2ZQ5G/E=
google.golang.org/genproto/googleapis/api v0.0.0-20241202173237-19429a94021a/go.mod h1:jehYqy3+AhJU9ve55aNOaSml7wUXjF9x6z2LcCfpAhY=
google.golang.org/genproto/googleapis/rpc v0.0.0-20241202173237-19429a94021a h1:hgh8P4EuoxpsuKMXX/To36nOFD7vixReXgn8lPGnt+o=
google.golang.org/genproto/googleapis/rpc v0.0.0-20241202173237-19429a94021a/go.mod h1:5uTbfoYQed2U9p3KIj2/Zzm02PYhndfdmML0qC3q3FU=
google.golang.org/genproto/googleapis/api v0.0.0-20241206012308-a4fef0638583 h1:v+j+5gpj0FopU0KKLDGfDo9ZRRpKdi5UBrCP0f76kuY=
google.golang.org/genproto/googleapis/api v0.0.0-20241206012308-a4fef0638583/go.mod h1:jehYqy3+AhJU9ve55aNOaSml7wUXjF9x6z2LcCfpAhY=
google.golang.org/genproto/googleapis/rpc v0.0.0-20241206012308-a4fef0638583 h1:IfdSdTcLFy4lqUQrQJLkLt1PB+AsqVz6lwkWPzWEz10=
google.golang.org/genproto/googleapis/rpc v0.0.0-20241206012308-a4fef0638583/go.mod h1:5uTbfoYQed2U9p3KIj2/Zzm02PYhndfdmML0qC3q3FU=
google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg=
google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY=
google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk=
google.golang.org/grpc v1.29.1/go.mod h1:itym6AZVZYACWQqET3MqgPpjcuV5QH3BxFS3IjizoKk=
google.golang.org/grpc v1.33.1/go.mod h1:fr5YgcSWrqhRRxogOsw7RzIpsmvOZ6IcH4kBYTpR3n0=
google.golang.org/grpc v1.68.0 h1:aHQeeJbo8zAkAa3pRzrVjZlbz6uSfeOXlJNQM0RAbz0=
google.golang.org/grpc v1.68.0/go.mod h1:fmSPC5AsjSBCK54MyHRx48kpOti1/jRfOlwEWywNjWA=
google.golang.org/grpc v1.68.1 h1:oI5oTa11+ng8r8XMMN7jAOmWfPZWbYpCFaMUTACxkM0=
google.golang.org/grpc v1.68.1/go.mod h1:+q1XYFJjShcqn0QZHvCyeR4CXPA+llXIeUIfIe00waw=
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM=
Expand Down
14 changes: 14 additions & 0 deletions pkg/state/impl/etcd/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,13 @@ type eUnsupported struct {

func (eUnsupported) UnsupportedError() {}

//nolint:errname
type eInvalidWatchBookmark struct {
error
}

func (eInvalidWatchBookmark) InvalidWatchBookmarkError() {}

// ErrAlreadyExists generates error compatible with state.ErrConflict.
func ErrAlreadyExists(r resource.Reference) error {
return eConflict{
Expand Down Expand Up @@ -107,3 +114,10 @@ func ErrUnsupported(operation string) error {
fmt.Errorf("operation %s is not supported", operation),
}
}

// ErrInvalidWatchBookmark generates error compatible with state.ErrInvalidWatchBookmark.
func ErrInvalidWatchBookmark(e error) error {
return eInvalidWatchBookmark{
e,
}
}
3 changes: 3 additions & 0 deletions pkg/state/impl/etcd/errors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package etcd_test

import (
"errors"
"testing"

"github.com/cosi-project/runtime/pkg/resource"
Expand All @@ -27,4 +28,6 @@ func TestErrors(t *testing.T) {
require.True(t, state.IsConflictError(etcd.ErrAlreadyExists(res), state.WithResourceType("a")))
require.False(t, state.IsConflictError(etcd.ErrAlreadyExists(res), state.WithResourceType("b")))
require.True(t, state.IsConflictError(etcd.ErrAlreadyExists(res), state.WithResourceNamespace("ns")))

require.True(t, state.IsInvalidWatchBookmarkError(etcd.ErrInvalidWatchBookmark(errors.New("invalid"))))
}
26 changes: 23 additions & 3 deletions pkg/state/impl/etcd/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package etcd
import (
"context"
"encoding/binary"
"errors"
"fmt"
"sort"
"strconv"
Expand All @@ -19,6 +20,7 @@ import (
"github.com/siderolabs/gen/channel"
"github.com/siderolabs/gen/xslices"
"go.etcd.io/etcd/api/v3/mvccpb"
"go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
clientv3 "go.etcd.io/etcd/client/v3"

"github.com/cosi-project/state-etcd/pkg/util"
Expand Down Expand Up @@ -346,7 +348,7 @@ func encodeBookmark(revision int64) state.Bookmark {

func decodeBookmark(bookmark state.Bookmark) (int64, error) {
if len(bookmark) != 8 {
return 0, fmt.Errorf("invalid bookmark length: %d", len(bookmark))
return 0, ErrInvalidWatchBookmark(fmt.Errorf("invalid bookmark length: %d", len(bookmark)))
}

return int64(binary.BigEndian.Uint64(bookmark)), nil
Expand Down Expand Up @@ -449,10 +451,19 @@ func (st *State) Watch(ctx context.Context, resourcePointer resource.Pointer, ch
}

if watchResponse.Err() != nil {
err := watchResponse.Err()

switch {
case errors.Is(err, rpctypes.ErrCompacted):
err = ErrInvalidWatchBookmark(err)
case errors.Is(err, rpctypes.ErrFutureRev):
err = ErrInvalidWatchBookmark(err)
}

channel.SendWithContext(ctx, ch,
state.Event{
Type: state.Errored,
Error: watchResponse.Err(),
Error: err,
},
)

Expand Down Expand Up @@ -637,9 +648,18 @@ func (st *State) watchKind(ctx context.Context, resourceKind resource.Kind, sing
}

if watchResponse.Err() != nil {
err := watchResponse.Err()

switch {
case errors.Is(err, rpctypes.ErrCompacted):
err = ErrInvalidWatchBookmark(err)
case errors.Is(err, rpctypes.ErrFutureRev):
err = ErrInvalidWatchBookmark(err)
}

watchErrorEvent := state.Event{
Type: state.Errored,
Error: watchResponse.Err(),
Error: err,
}

switch {
Expand Down
8 changes: 7 additions & 1 deletion pkg/state/impl/etcd/etcd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,10 +127,16 @@ func TestClearGRPCMetadata(t *testing.T) {
}

func withEtcd(t *testing.T, f func(state.State)) {
withEtcdAndClient(t, func(st state.State, _ *clientv3.Client) {
f(st)
})
}

func withEtcdAndClient(t *testing.T, f func(state.State, *clientv3.Client)) {
testhelpers.WithEtcd(t, func(cli *clientv3.Client) {
etcdState := etcd.NewState(cli, store.ProtobufMarshaler{}, etcd.WithSalt([]byte("test123")))
st := state.WrapCore(etcdState)

f(st)
f(st, cli)
})
}
60 changes: 60 additions & 0 deletions pkg/state/impl/etcd/watch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/cosi-project/runtime/pkg/state/conformance"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/goleak"
)

Expand Down Expand Up @@ -344,3 +345,62 @@ func TestWatchKindStress(t *testing.T) {
}
})
}

func TestWatchInvalidBookmark(t *testing.T) {
t.Cleanup(func() { goleak.VerifyNone(t, goleak.IgnoreCurrent()) })

withEtcdAndClient(t, func(s state.State, client *clientv3.Client) {
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel()

watchCh := make(chan state.Event)

require.NoError(t, s.WatchKind(ctx, conformance.NewPathResource("default", "").Metadata(), watchCh))

require.NoError(t, s.Create(ctx, conformance.NewPathResource("default", "path-0")))

const watchEventTimeout = 10 * time.Second

var bookmark []byte

select {
case <-time.After(watchEventTimeout):
t.Fatal("timeout waiting for event")
case ev := <-watchCh:
assert.Equal(t, state.Created, ev.Type)
require.NotNil(t, ev.Bookmark)

bookmark = ev.Bookmark
}

// create one more resource
require.NoError(t, s.Create(ctx, conformance.NewPathResource("default", "path-1")))

// figure out last etcd revision
resp, err := client.Get(ctx, "default", clientv3.WithPrefix())
require.NoError(t, err)

lastRevision := resp.Header.Revision

// create one more resource
require.NoError(t, s.Create(ctx, conformance.NewPathResource("default", "path-2")))

// compact away
_, err = client.Compact(ctx, lastRevision)
require.NoError(t, err)

// try to watch with the old bookmark
watch2Ch := make(chan state.Event)

err = s.WatchKind(ctx, conformance.NewPathResource("default", "").Metadata(), watch2Ch, state.WithKindStartFromBookmark(bookmark))
require.NoError(t, err)

select {
case <-time.After(watchEventTimeout):
t.Fatal("timeout waiting for event")
case ev := <-watch2Ch:
require.Error(t, ev.Error)
assert.True(t, state.IsInvalidWatchBookmarkError(ev.Error), "error: %v", ev.Error)
}
})
}

0 comments on commit 7462e97

Please sign in to comment.