From 7462e97a085715123d80d9fb0fbed55ab2f287cb Mon Sep 17 00:00:00 2001 From: Andrey Smirnov Date: Fri, 13 Dec 2024 16:06:36 +0400 Subject: [PATCH] feat: support invalid watch bookmark concept Catch up with https://github.com/cosi-project/runtime/pull/512. Signed-off-by: Andrey Smirnov --- go.mod | 18 ++++----- go.sum | 36 +++++++++--------- pkg/state/impl/etcd/errors.go | 14 +++++++ pkg/state/impl/etcd/errors_test.go | 3 ++ pkg/state/impl/etcd/etcd.go | 26 +++++++++++-- pkg/state/impl/etcd/etcd_test.go | 8 +++- pkg/state/impl/etcd/watch_test.go | 60 ++++++++++++++++++++++++++++++ 7 files changed, 134 insertions(+), 31 deletions(-) diff --git a/go.mod b/go.mod index 0dff1a3..fd2085f 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module github.com/cosi-project/state-etcd go 1.23.0 require ( - github.com/cosi-project/runtime v0.7.4 + github.com/cosi-project/runtime v0.7.6 github.com/siderolabs/gen v0.7.0 github.com/stretchr/testify v1.10.0 go.etcd.io/etcd/api/v3 v3.5.17 @@ -11,8 +11,8 @@ require ( go.etcd.io/etcd/server/v3 v3.5.17 go.uber.org/goleak v1.3.0 go.uber.org/zap v1.27.0 - golang.org/x/sync v0.9.0 - google.golang.org/grpc v1.68.0 + golang.org/x/sync v0.10.0 + google.golang.org/grpc v1.68.1 ) require ( @@ -68,14 +68,14 @@ require ( go.opentelemetry.io/otel/trace v1.20.0 // indirect go.opentelemetry.io/proto/otlp v1.0.0 // indirect go.uber.org/multierr v1.11.0 // indirect - golang.org/x/crypto v0.29.0 // indirect - golang.org/x/net v0.31.0 // indirect - golang.org/x/sys v0.27.0 // indirect - golang.org/x/text v0.20.0 // indirect + golang.org/x/crypto v0.30.0 // indirect + golang.org/x/net v0.32.0 // indirect + golang.org/x/sys v0.28.0 // indirect + golang.org/x/text v0.21.0 // indirect golang.org/x/time v0.8.0 // indirect google.golang.org/genproto v0.0.0-20241202173237-19429a94021a // indirect - google.golang.org/genproto/googleapis/api v0.0.0-20241202173237-19429a94021a // indirect - google.golang.org/genproto/googleapis/rpc v0.0.0-20241202173237-19429a94021a // indirect + google.golang.org/genproto/googleapis/api v0.0.0-20241206012308-a4fef0638583 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20241206012308-a4fef0638583 // indirect google.golang.org/protobuf v1.35.2 // indirect gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect diff --git a/go.sum b/go.sum index b19f513..5955882 100644 --- a/go.sum +++ b/go.sum @@ -34,8 +34,8 @@ github.com/coreos/go-semver v0.3.1 h1:yi21YpKnrx1gt5R+la8n5WgS0kCrsPp33dmEyHReZr github.com/coreos/go-semver v0.3.1/go.mod h1:irMmmIw/7yzSRPWryHsK7EYSg09caPQL03VsM8rvUec= github.com/coreos/go-systemd/v22 v22.5.0 h1:RrqgGjYQKalulkV8NGVIfkXQf6YYmOyiJKk8iXXhfZs= github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= -github.com/cosi-project/runtime v0.7.4 h1:DQAXhYQ9dkDHIGHsrCAuJOrTNaIp13gvrOUtJ4kkeys= -github.com/cosi-project/runtime v0.7.4/go.mod h1:9hGWkvz7PORXNzC/gJokapXvR+Fb/Mpl6Ic+05WDRMk= +github.com/cosi-project/runtime v0.7.6 h1:G6w4/g6EXrMakji0fHRDHvs9wltqF9LSDU/33er8gdc= +github.com/cosi-project/runtime v0.7.6/go.mod h1:AmDu/IfE/Q0YYzWRnAkDw2GNuMazpNpN9qyV1IErZdc= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -248,8 +248,8 @@ golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnf golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= -golang.org/x/crypto v0.29.0 h1:L5SG1JTTXupVV3n6sUqMTeWbjAyfPwoda2DLX8J8FrQ= -golang.org/x/crypto v0.29.0/go.mod h1:+F4F4N5hv6v38hfeYwTdx20oUvLLc+QfrE9Ax9HtgRg= +golang.org/x/crypto v0.30.0 h1:RwoQn3GkWiMkzlX562cLB7OxWvjH1L8xutO2WoJcRoY= +golang.org/x/crypto v0.30.0/go.mod h1:kDsLvtWBEx7MV9tJOj9bnXsPbxwJQ6csT/x4KIN4Ssk= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU= @@ -272,8 +272,8 @@ golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81R golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20201202161906-c7110b5ffcbb/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20211123203042-d83791d6bcd9/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= -golang.org/x/net v0.31.0 h1:68CPQngjLL0r2AlUKiSxtQFKvzRVbnzLwMUn5SzcLHo= -golang.org/x/net v0.31.0/go.mod h1:P4fl1q7dY2hnZFxEk4pPSkDHF+QqjitcnDjUQyMM+pM= +golang.org/x/net v0.32.0 h1:ZqPmj8Kzc+Y6e0+skZsuACbx+wzMgo5MQsJh9Qd6aYI= +golang.org/x/net v0.32.0/go.mod h1:CwU0IoeOlnQQWJ6ioyFrfRuomB8GKF6KbYXZVyeXNfs= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= @@ -286,8 +286,8 @@ golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.9.0 h1:fEo0HyrW1GIgZdpbhCRO0PkJajUS5H9IFUztCgEo2jQ= -golang.org/x/sync v0.9.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sync v0.10.0 h1:3NQrjDixjgGwUOCaF8w2+VYHv0Ve/vGYSbdkTa98gmQ= +golang.org/x/sync v0.10.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -307,15 +307,15 @@ golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20211025201205-69cdffdb9359/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.27.0 h1:wBqf8DvsY9Y/2P8gAfPDEYNuS30J4lPHJxXSb/nJZ+s= -golang.org/x/sys v0.27.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.28.0 h1:Fksou7UEQUWlKvIdsqzJmUmCX3cZuD2+P3XyyzwMhlA= +golang.org/x/sys v0.28.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= -golang.org/x/text v0.20.0 h1:gK/Kv2otX8gz+wn7Rmb3vT96ZwuoxnQlY+HlJVj7Qug= -golang.org/x/text v0.20.0/go.mod h1:D4IsuqiFMhST5bX19pQ9ikHC2GsaKyk/oF+pn3ducp4= +golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo= +golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ= golang.org/x/time v0.8.0 h1:9i3RxcPv3PZnitoVGMPDKZSq1xW1gK1Xy3ArNOGZfEg= golang.org/x/time v0.8.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= @@ -339,18 +339,18 @@ google.golang.org/genproto v0.0.0-20200423170343-7949de9c1215/go.mod h1:55QSHmfG google.golang.org/genproto v0.0.0-20200513103714-09dca8ec2884/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c= google.golang.org/genproto v0.0.0-20241202173237-19429a94021a h1:4voejwOVTsjw6IMfnGt8IzTQBIw45hP8S0e77UMounA= google.golang.org/genproto v0.0.0-20241202173237-19429a94021a/go.mod h1:dW27OyXi0Ph+N43jeCWMFC86aTT5VgdeQtOSf0Hehdw= -google.golang.org/genproto/googleapis/api v0.0.0-20241202173237-19429a94021a h1:OAiGFfOiA0v9MRYsSidp3ubZaBnteRUyn3xB2ZQ5G/E= -google.golang.org/genproto/googleapis/api v0.0.0-20241202173237-19429a94021a/go.mod h1:jehYqy3+AhJU9ve55aNOaSml7wUXjF9x6z2LcCfpAhY= -google.golang.org/genproto/googleapis/rpc v0.0.0-20241202173237-19429a94021a h1:hgh8P4EuoxpsuKMXX/To36nOFD7vixReXgn8lPGnt+o= -google.golang.org/genproto/googleapis/rpc v0.0.0-20241202173237-19429a94021a/go.mod h1:5uTbfoYQed2U9p3KIj2/Zzm02PYhndfdmML0qC3q3FU= +google.golang.org/genproto/googleapis/api v0.0.0-20241206012308-a4fef0638583 h1:v+j+5gpj0FopU0KKLDGfDo9ZRRpKdi5UBrCP0f76kuY= +google.golang.org/genproto/googleapis/api v0.0.0-20241206012308-a4fef0638583/go.mod h1:jehYqy3+AhJU9ve55aNOaSml7wUXjF9x6z2LcCfpAhY= +google.golang.org/genproto/googleapis/rpc v0.0.0-20241206012308-a4fef0638583 h1:IfdSdTcLFy4lqUQrQJLkLt1PB+AsqVz6lwkWPzWEz10= +google.golang.org/genproto/googleapis/rpc v0.0.0-20241206012308-a4fef0638583/go.mod h1:5uTbfoYQed2U9p3KIj2/Zzm02PYhndfdmML0qC3q3FU= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY= google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk= google.golang.org/grpc v1.29.1/go.mod h1:itym6AZVZYACWQqET3MqgPpjcuV5QH3BxFS3IjizoKk= google.golang.org/grpc v1.33.1/go.mod h1:fr5YgcSWrqhRRxogOsw7RzIpsmvOZ6IcH4kBYTpR3n0= -google.golang.org/grpc v1.68.0 h1:aHQeeJbo8zAkAa3pRzrVjZlbz6uSfeOXlJNQM0RAbz0= -google.golang.org/grpc v1.68.0/go.mod h1:fmSPC5AsjSBCK54MyHRx48kpOti1/jRfOlwEWywNjWA= +google.golang.org/grpc v1.68.1 h1:oI5oTa11+ng8r8XMMN7jAOmWfPZWbYpCFaMUTACxkM0= +google.golang.org/grpc v1.68.1/go.mod h1:+q1XYFJjShcqn0QZHvCyeR4CXPA+llXIeUIfIe00waw= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= diff --git a/pkg/state/impl/etcd/errors.go b/pkg/state/impl/etcd/errors.go index 1045786..4c05f6a 100644 --- a/pkg/state/impl/etcd/errors.go +++ b/pkg/state/impl/etcd/errors.go @@ -57,6 +57,13 @@ type eUnsupported struct { func (eUnsupported) UnsupportedError() {} +//nolint:errname +type eInvalidWatchBookmark struct { + error +} + +func (eInvalidWatchBookmark) InvalidWatchBookmarkError() {} + // ErrAlreadyExists generates error compatible with state.ErrConflict. func ErrAlreadyExists(r resource.Reference) error { return eConflict{ @@ -107,3 +114,10 @@ func ErrUnsupported(operation string) error { fmt.Errorf("operation %s is not supported", operation), } } + +// ErrInvalidWatchBookmark generates error compatible with state.ErrInvalidWatchBookmark. +func ErrInvalidWatchBookmark(e error) error { + return eInvalidWatchBookmark{ + e, + } +} diff --git a/pkg/state/impl/etcd/errors_test.go b/pkg/state/impl/etcd/errors_test.go index 7d8ee41..c999fb9 100644 --- a/pkg/state/impl/etcd/errors_test.go +++ b/pkg/state/impl/etcd/errors_test.go @@ -5,6 +5,7 @@ package etcd_test import ( + "errors" "testing" "github.com/cosi-project/runtime/pkg/resource" @@ -27,4 +28,6 @@ func TestErrors(t *testing.T) { require.True(t, state.IsConflictError(etcd.ErrAlreadyExists(res), state.WithResourceType("a"))) require.False(t, state.IsConflictError(etcd.ErrAlreadyExists(res), state.WithResourceType("b"))) require.True(t, state.IsConflictError(etcd.ErrAlreadyExists(res), state.WithResourceNamespace("ns"))) + + require.True(t, state.IsInvalidWatchBookmarkError(etcd.ErrInvalidWatchBookmark(errors.New("invalid")))) } diff --git a/pkg/state/impl/etcd/etcd.go b/pkg/state/impl/etcd/etcd.go index 2f21ac0..0f17526 100644 --- a/pkg/state/impl/etcd/etcd.go +++ b/pkg/state/impl/etcd/etcd.go @@ -8,6 +8,7 @@ package etcd import ( "context" "encoding/binary" + "errors" "fmt" "sort" "strconv" @@ -19,6 +20,7 @@ import ( "github.com/siderolabs/gen/channel" "github.com/siderolabs/gen/xslices" "go.etcd.io/etcd/api/v3/mvccpb" + "go.etcd.io/etcd/api/v3/v3rpc/rpctypes" clientv3 "go.etcd.io/etcd/client/v3" "github.com/cosi-project/state-etcd/pkg/util" @@ -346,7 +348,7 @@ func encodeBookmark(revision int64) state.Bookmark { func decodeBookmark(bookmark state.Bookmark) (int64, error) { if len(bookmark) != 8 { - return 0, fmt.Errorf("invalid bookmark length: %d", len(bookmark)) + return 0, ErrInvalidWatchBookmark(fmt.Errorf("invalid bookmark length: %d", len(bookmark))) } return int64(binary.BigEndian.Uint64(bookmark)), nil @@ -449,10 +451,19 @@ func (st *State) Watch(ctx context.Context, resourcePointer resource.Pointer, ch } if watchResponse.Err() != nil { + err := watchResponse.Err() + + switch { + case errors.Is(err, rpctypes.ErrCompacted): + err = ErrInvalidWatchBookmark(err) + case errors.Is(err, rpctypes.ErrFutureRev): + err = ErrInvalidWatchBookmark(err) + } + channel.SendWithContext(ctx, ch, state.Event{ Type: state.Errored, - Error: watchResponse.Err(), + Error: err, }, ) @@ -637,9 +648,18 @@ func (st *State) watchKind(ctx context.Context, resourceKind resource.Kind, sing } if watchResponse.Err() != nil { + err := watchResponse.Err() + + switch { + case errors.Is(err, rpctypes.ErrCompacted): + err = ErrInvalidWatchBookmark(err) + case errors.Is(err, rpctypes.ErrFutureRev): + err = ErrInvalidWatchBookmark(err) + } + watchErrorEvent := state.Event{ Type: state.Errored, - Error: watchResponse.Err(), + Error: err, } switch { diff --git a/pkg/state/impl/etcd/etcd_test.go b/pkg/state/impl/etcd/etcd_test.go index 0c17bf9..072560b 100644 --- a/pkg/state/impl/etcd/etcd_test.go +++ b/pkg/state/impl/etcd/etcd_test.go @@ -127,10 +127,16 @@ func TestClearGRPCMetadata(t *testing.T) { } func withEtcd(t *testing.T, f func(state.State)) { + withEtcdAndClient(t, func(st state.State, _ *clientv3.Client) { + f(st) + }) +} + +func withEtcdAndClient(t *testing.T, f func(state.State, *clientv3.Client)) { testhelpers.WithEtcd(t, func(cli *clientv3.Client) { etcdState := etcd.NewState(cli, store.ProtobufMarshaler{}, etcd.WithSalt([]byte("test123"))) st := state.WrapCore(etcdState) - f(st) + f(st, cli) }) } diff --git a/pkg/state/impl/etcd/watch_test.go b/pkg/state/impl/etcd/watch_test.go index 0dda2e9..988c3ab 100644 --- a/pkg/state/impl/etcd/watch_test.go +++ b/pkg/state/impl/etcd/watch_test.go @@ -15,6 +15,7 @@ import ( "github.com/cosi-project/runtime/pkg/state/conformance" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + clientv3 "go.etcd.io/etcd/client/v3" "go.uber.org/goleak" ) @@ -344,3 +345,62 @@ func TestWatchKindStress(t *testing.T) { } }) } + +func TestWatchInvalidBookmark(t *testing.T) { + t.Cleanup(func() { goleak.VerifyNone(t, goleak.IgnoreCurrent()) }) + + withEtcdAndClient(t, func(s state.State, client *clientv3.Client) { + ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) + defer cancel() + + watchCh := make(chan state.Event) + + require.NoError(t, s.WatchKind(ctx, conformance.NewPathResource("default", "").Metadata(), watchCh)) + + require.NoError(t, s.Create(ctx, conformance.NewPathResource("default", "path-0"))) + + const watchEventTimeout = 10 * time.Second + + var bookmark []byte + + select { + case <-time.After(watchEventTimeout): + t.Fatal("timeout waiting for event") + case ev := <-watchCh: + assert.Equal(t, state.Created, ev.Type) + require.NotNil(t, ev.Bookmark) + + bookmark = ev.Bookmark + } + + // create one more resource + require.NoError(t, s.Create(ctx, conformance.NewPathResource("default", "path-1"))) + + // figure out last etcd revision + resp, err := client.Get(ctx, "default", clientv3.WithPrefix()) + require.NoError(t, err) + + lastRevision := resp.Header.Revision + + // create one more resource + require.NoError(t, s.Create(ctx, conformance.NewPathResource("default", "path-2"))) + + // compact away + _, err = client.Compact(ctx, lastRevision) + require.NoError(t, err) + + // try to watch with the old bookmark + watch2Ch := make(chan state.Event) + + err = s.WatchKind(ctx, conformance.NewPathResource("default", "").Metadata(), watch2Ch, state.WithKindStartFromBookmark(bookmark)) + require.NoError(t, err) + + select { + case <-time.After(watchEventTimeout): + t.Fatal("timeout waiting for event") + case ev := <-watch2Ch: + require.Error(t, ev.Error) + assert.True(t, state.IsInvalidWatchBookmarkError(ev.Error), "error: %v", ev.Error) + } + }) +}