From 867d4add336dc72a6455ec4deb8427200ddf7a6a Mon Sep 17 00:00:00 2001 From: Andrey Smirnov Date: Mon, 4 Nov 2024 14:58:37 +0400 Subject: [PATCH] feat: support watch bookmarks Support restarting watches from etcd. See https://github.com/cosi-project/runtime/pull/508 Signed-off-by: Andrey Smirnov --- .drone.yml | 4 +- Dockerfile | 8 +-- Makefile | 10 ++-- go.mod | 20 +++---- go.sum | 52 ++++++++-------- pkg/state/impl/etcd/etcd.go | 114 ++++++++++++++++++++++++------------ 6 files changed, 124 insertions(+), 84 deletions(-) diff --git a/.drone.yml b/.drone.yml index 8f368c1..aac788f 100644 --- a/.drone.yml +++ b/.drone.yml @@ -1,7 +1,7 @@ --- # THIS FILE WAS AUTOMATICALLY GENERATED, PLEASE DO NOT EDIT. # -# Generated on 2024-08-29T10:55:47Z by kres 295eb33-dirty. +# Generated on 2024-11-04T10:03:44Z by kres dd14759. kind: pipeline type: kubernetes @@ -174,7 +174,7 @@ steps: services: - name: docker - image: docker:27.1-dind + image: docker:27.3-dind entrypoint: - dockerd commands: diff --git a/Dockerfile b/Dockerfile index 543770f..419f1d7 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,8 +1,8 @@ -# syntax = docker/dockerfile-upstream:1.9.0-labs +# syntax = docker/dockerfile-upstream:1.10.0-labs # THIS FILE WAS AUTOMATICALLY GENERATED, PLEASE DO NOT EDIT. # -# Generated on 2024-08-29T10:32:53Z by kres 295eb33-dirty. +# Generated on 2024-11-04T10:03:44Z by kres dd14759. ARG TOOLCHAIN @@ -10,9 +10,9 @@ ARG TOOLCHAIN FROM scratch AS generate # runs markdownlint -FROM docker.io/oven/bun:1.1.26-alpine AS lint-markdown +FROM docker.io/oven/bun:1.1.32-alpine AS lint-markdown WORKDIR /src -RUN bun i markdownlint-cli@0.41.0 sentences-per-line@0.2.1 +RUN bun i markdownlint-cli@0.42.0 sentences-per-line@0.2.1 COPY .markdownlint.json . COPY ./README.md ./README.md RUN bunx markdownlint --ignore "CHANGELOG.md" --ignore "**/node_modules/**" --ignore '**/hack/chglog/**' --rules node_modules/sentences-per-line/index.js . diff --git a/Makefile b/Makefile index e5bc662..b67a720 100644 --- a/Makefile +++ b/Makefile @@ -1,6 +1,6 @@ # THIS FILE WAS AUTOMATICALLY GENERATED, PLEASE DO NOT EDIT. # -# Generated on 2024-08-29T10:32:53Z by kres 295eb33-dirty. +# Generated on 2024-11-04T10:03:44Z by kres dd14759. # common variables @@ -17,15 +17,15 @@ WITH_RACE ?= false REGISTRY ?= ghcr.io USERNAME ?= cosi-project REGISTRY_AND_USERNAME ?= $(REGISTRY)/$(USERNAME) -PROTOBUF_GO_VERSION ?= 1.34.2 +PROTOBUF_GO_VERSION ?= 1.35.1 GRPC_GO_VERSION ?= 1.5.1 GRPC_GATEWAY_VERSION ?= 2.22.0 VTPROTOBUF_VERSION ?= 0.6.0 -GOIMPORTS_VERSION ?= 0.24.0 +GOIMPORTS_VERSION ?= 0.26.0 DEEPCOPY_VERSION ?= v0.5.6 -GOLANGCILINT_VERSION ?= v1.60.3 +GOLANGCILINT_VERSION ?= v1.61.0 GOFUMPT_VERSION ?= v0.7.0 -GO_VERSION ?= 1.23.0 +GO_VERSION ?= 1.23.2 GO_BUILDFLAGS ?= GO_LDFLAGS ?= CGO_ENABLED ?= 0 diff --git a/go.mod b/go.mod index 0e0368e..6ee0322 100644 --- a/go.mod +++ b/go.mod @@ -3,16 +3,16 @@ module github.com/cosi-project/state-etcd go 1.23.0 require ( - github.com/cosi-project/runtime v0.6.3 - github.com/siderolabs/gen v0.5.0 + github.com/cosi-project/runtime v0.7.1 + github.com/siderolabs/gen v0.7.0 github.com/stretchr/testify v1.9.0 - go.etcd.io/etcd/api/v3 v3.5.15 - go.etcd.io/etcd/client/v3 v3.5.15 - go.etcd.io/etcd/server/v3 v3.5.15 + go.etcd.io/etcd/api/v3 v3.5.16 + go.etcd.io/etcd/client/v3 v3.5.16 + go.etcd.io/etcd/server/v3 v3.5.16 go.uber.org/goleak v1.3.0 go.uber.org/zap v1.27.0 golang.org/x/sync v0.8.0 - google.golang.org/grpc v1.66.0 + google.golang.org/grpc v1.67.1 ) require ( @@ -55,10 +55,10 @@ require ( github.com/tmc/grpc-websocket-proxy v0.0.0-20201229170055-e5319fda7802 // indirect github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 // indirect go.etcd.io/bbolt v1.3.11 // indirect - go.etcd.io/etcd/client/pkg/v3 v3.5.15 // indirect - go.etcd.io/etcd/client/v2 v2.305.15 // indirect - go.etcd.io/etcd/pkg/v3 v3.5.15 // indirect - go.etcd.io/etcd/raft/v3 v3.5.15 // indirect + go.etcd.io/etcd/client/pkg/v3 v3.5.16 // indirect + go.etcd.io/etcd/client/v2 v2.305.16 // indirect + go.etcd.io/etcd/pkg/v3 v3.5.16 // indirect + go.etcd.io/etcd/raft/v3 v3.5.16 // indirect go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.46.0 // indirect go.opentelemetry.io/otel v1.20.0 // indirect go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.20.0 // indirect diff --git a/go.sum b/go.sum index d6e2d2a..7b32d2b 100644 --- a/go.sum +++ b/go.sum @@ -2,8 +2,8 @@ cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMT cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= cloud.google.com/go v0.112.0 h1:tpFCD7hpHFlQ8yPwT3x+QeXqc2T6+n6T+hmABHfDUSM= cloud.google.com/go/compute v1.23.3 h1:6sVlXXBmbd7jNX0Ipq0trII3e4n1/MsADLK6a+aiVlk= -cloud.google.com/go/compute/metadata v0.3.0 h1:Tz+eQXMEqDIKRsmY3cHTL6FVaynIjX2QxYC4trgAKZc= -cloud.google.com/go/compute/metadata v0.3.0/go.mod h1:zFmK7XCadkQkj6TtorcaGlCW1hT1fIilQDwofLpJ20k= +cloud.google.com/go/compute/metadata v0.5.0 h1:Zr0eK8JbFv6+Wi4ilXAR8FJ3wyNdpxHKJNPos6LTZOY= +cloud.google.com/go/compute/metadata v0.5.0/go.mod h1:aHnloV2TPI38yx4s9+wAZhHykWvVCfu7hQbF+9CWoiY= github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= @@ -26,16 +26,16 @@ github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UF github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= -github.com/cncf/xds/go v0.0.0-20240423153145-555b57ec207b h1:ga8SEFjZ60pxLcmhnThWgvH2wg8376yUJmPhEH4H3kw= -github.com/cncf/xds/go v0.0.0-20240423153145-555b57ec207b/go.mod h1:W+zGtBO5Y1IgJhy4+A9GOqVhqLpfZi+vwmdNXUehLA8= +github.com/cncf/xds/go v0.0.0-20240723142845-024c85f92f20 h1:N+3sFI5GUjRKBi+i0TxYVST9h4Ie192jJWpHvthBBgg= +github.com/cncf/xds/go v0.0.0-20240723142845-024c85f92f20/go.mod h1:W+zGtBO5Y1IgJhy4+A9GOqVhqLpfZi+vwmdNXUehLA8= github.com/cockroachdb/datadriven v1.0.2 h1:H9MtNqVoVhvd9nCBwOyDjUEdZCREqbIdCJD93PBm/jA= github.com/cockroachdb/datadriven v1.0.2/go.mod h1:a9RdTaap04u637JoCzcUoIcDmvwSUtcUFtT/C3kJlTU= github.com/coreos/go-semver v0.3.0 h1:wkHLiw0WNATZnSG7epLsujiMCgPAc9xhjJ4tgnAxmfM= github.com/coreos/go-semver v0.3.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk= github.com/coreos/go-systemd/v22 v22.3.2 h1:D9/bQk5vlXQFZ6Kwuu6zaiXJ9oTPe68++AzAJc1DzSI= github.com/coreos/go-systemd/v22 v22.3.2/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= -github.com/cosi-project/runtime v0.6.3 h1:nCmEIY1jypCoraRaNsFvaLHkBI/ALYk/X1CGea7Ie1E= -github.com/cosi-project/runtime v0.6.3/go.mod h1:2iQ2Wu57UNm89ZksFLoMLGEZnPzUkk5g64+AlWHopo4= +github.com/cosi-project/runtime v0.7.1 h1:cOF2/ljLa0NPQV6/S1RZHcSUXM0UyOlD5F3fwwYSQEg= +github.com/cosi-project/runtime v0.7.1/go.mod h1:EMLs8a55tJ6zA4UyDbRsTvXBd6UIlNwZfCVGvCyiXK8= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -45,8 +45,8 @@ github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymF github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98= github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= -github.com/envoyproxy/protoc-gen-validate v1.0.4 h1:gVPz/FMfvh57HdSJQyvBtF00j8JU4zdyUgIUNhlgg0A= -github.com/envoyproxy/protoc-gen-validate v1.0.4/go.mod h1:qys6tmnRsYrQqIhm2bvKZH4Blx/1gTIZ2UKVY1M+Yew= +github.com/envoyproxy/protoc-gen-validate v1.1.0 h1:tntQDh69XqOCOZsDz0lVJQez/2L6Uu2PdjCQwWCJ3bM= +github.com/envoyproxy/protoc-gen-validate v1.1.0/go.mod h1:sXRDRVmzEbkM7CVcM06s9shE/m23dg3wzjl0UWqJ2q4= github.com/gertd/go-pluralize v0.2.1 h1:M3uASbVjMnTsPb0PNqg+E/24Vwigyo/tvyMTtAlLgiA= github.com/gertd/go-pluralize v0.2.1/go.mod h1:rbYaKDbsXxmRfr8uygAEKhOWsjyrrqrkHVpZvoOp8zk= github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= @@ -166,8 +166,8 @@ github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1 github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8= github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4= -github.com/siderolabs/gen v0.5.0 h1:Afdjx+zuZDf53eH5DB+E+T2JeCwBXGinV66A6osLgQI= -github.com/siderolabs/gen v0.5.0/go.mod h1:1GUMBNliW98Xeq8GPQeVMYqQE09LFItE8enR3wgMh3Q= +github.com/siderolabs/gen v0.7.0 h1:uHAt3WD0dof28NHFuguWBbDokaXQraR/HyVxCLw2QCU= +github.com/siderolabs/gen v0.7.0/go.mod h1:an3a2Y53O7kUjnnK8Bfu3gewtvnIOu5RTU6HalFtXQQ= github.com/siderolabs/go-pointer v1.0.0 h1:6TshPKep2doDQJAAtHUuHWXbca8ZfyRySjSBT/4GsMU= github.com/siderolabs/go-pointer v1.0.0/go.mod h1:HTRFUNYa3R+k0FFKNv11zgkaCLzEkWVzoYZ433P3kHc= github.com/siderolabs/go-retry v0.3.3 h1:zKV+S1vumtO72E6sYsLlmIdV/G/GcYSBLiEx/c9oCEg= @@ -199,20 +199,20 @@ github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9de github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= go.etcd.io/bbolt v1.3.11 h1:yGEzV1wPz2yVCLsD8ZAiGHhHVlczyC9d1rP43/VCRJ0= go.etcd.io/bbolt v1.3.11/go.mod h1:dksAq7YMXoljX0xu6VF5DMZGbhYYoLUalEiSySYAS4I= -go.etcd.io/etcd/api/v3 v3.5.15 h1:3KpLJir1ZEBrYuV2v+Twaa/e2MdDCEZ/70H+lzEiwsk= -go.etcd.io/etcd/api/v3 v3.5.15/go.mod h1:N9EhGzXq58WuMllgH9ZvnEr7SI9pS0k0+DHZezGp7jM= -go.etcd.io/etcd/client/pkg/v3 v3.5.15 h1:fo0HpWz/KlHGMCC+YejpiCmyWDEuIpnTDzpJLB5fWlA= -go.etcd.io/etcd/client/pkg/v3 v3.5.15/go.mod h1:mXDI4NAOwEiszrHCb0aqfAYNCrZP4e9hRca3d1YK8EU= -go.etcd.io/etcd/client/v2 v2.305.15 h1:VG2xbf8Vz1KJh65Ar2V5eDmfkp1bpzkSEHlhJM3usp8= -go.etcd.io/etcd/client/v2 v2.305.15/go.mod h1:Ad5dRjPVb/n5yXgAWQ/hXzuXXkBk0Y658ocuXYaUU48= -go.etcd.io/etcd/client/v3 v3.5.15 h1:23M0eY4Fd/inNv1ZfU3AxrbbOdW79r9V9Rl62Nm6ip4= -go.etcd.io/etcd/client/v3 v3.5.15/go.mod h1:CLSJxrYjvLtHsrPKsy7LmZEE+DK2ktfd2bN4RhBMwlU= -go.etcd.io/etcd/pkg/v3 v3.5.15 h1:/Iu6Sr3iYaAjy++8sIDoZW9/EfhcwLZwd4FOZX2mMOU= -go.etcd.io/etcd/pkg/v3 v3.5.15/go.mod h1:e3Acf298sPFmTCGTrnGvkClEw9RYIyPtNzi1XM8rets= -go.etcd.io/etcd/raft/v3 v3.5.15 h1:jOA2HJF7zb3wy8H/pL13e8geWqkEa/kUs0waUggZC0I= -go.etcd.io/etcd/raft/v3 v3.5.15/go.mod h1:k3r7P4seEiUcgxOPLp+mloJWV3Q4QLPGNvy/OgC8OtM= -go.etcd.io/etcd/server/v3 v3.5.15 h1:x35jrWnZgsRwMsFsUJIUdT1bvzIz1B+29HjMfRYVN/E= -go.etcd.io/etcd/server/v3 v3.5.15/go.mod h1:l9jX9oa/iuArjqz0RNX/TDbc70dLXxRZo/nmPucrpFo= +go.etcd.io/etcd/api/v3 v3.5.16 h1:WvmyJVbjWqK4R1E+B12RRHz3bRGy9XVfh++MgbN+6n0= +go.etcd.io/etcd/api/v3 v3.5.16/go.mod h1:1P4SlIP/VwkDmGo3OlOD7faPeP8KDIFhqvciH5EfN28= +go.etcd.io/etcd/client/pkg/v3 v3.5.16 h1:ZgY48uH6UvB+/7R9Yf4x574uCO3jIx0TRDyetSfId3Q= +go.etcd.io/etcd/client/pkg/v3 v3.5.16/go.mod h1:V8acl8pcEK0Y2g19YlOV9m9ssUe6MgiDSobSoaBAM0E= +go.etcd.io/etcd/client/v2 v2.305.16 h1:kQrn9o5czVNaukf2A2At43cE9ZtWauOtf9vRZuiKXow= +go.etcd.io/etcd/client/v2 v2.305.16/go.mod h1:h9YxWCzcdvZENbfzBTFCnoNumr2ax3F19sKMqHFmXHE= +go.etcd.io/etcd/client/v3 v3.5.16 h1:sSmVYOAHeC9doqi0gv7v86oY/BTld0SEFGaxsU9eRhE= +go.etcd.io/etcd/client/v3 v3.5.16/go.mod h1:X+rExSGkyqxvu276cr2OwPLBaeqFu1cIl4vmRjAD/50= +go.etcd.io/etcd/pkg/v3 v3.5.16 h1:cnavs5WSPWeK4TYwPYfmcr3Joz9BH+TZ6qoUtz6/+mc= +go.etcd.io/etcd/pkg/v3 v3.5.16/go.mod h1:+lutCZHG5MBBFI/U4eYT5yL7sJfnexsoM20Y0t2uNuY= +go.etcd.io/etcd/raft/v3 v3.5.16 h1:zBXA3ZUpYs1AwiLGPafYAKKl/CORn/uaxYDwlNwndAk= +go.etcd.io/etcd/raft/v3 v3.5.16/go.mod h1:P4UP14AxofMJ/54boWilabqqWoW9eLodl6I5GdGzazI= +go.etcd.io/etcd/server/v3 v3.5.16 h1:d0/SAdJ3vVsZvF8IFVb1k8zqMZ+heGcNfft71ul9GWE= +go.etcd.io/etcd/server/v3 v3.5.16/go.mod h1:ynhyZZpdDp1Gq49jkUg5mfkDWZwXnn3eIqCqtJnrD/s= go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.46.0 h1:PzIubN4/sjByhDRHLviCjJuweBXWFZWhghjg7cS28+M= go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.46.0/go.mod h1:Ct6zzQEuGK3WpJs2n4dn+wfJYzd/+hNnxMRTWjGn30M= go.opentelemetry.io/otel v1.20.0 h1:vsb/ggIY+hUjD/zCAQHpzTmndPqv/ml2ArbsbfBYTAc= @@ -333,8 +333,8 @@ google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQ google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk= google.golang.org/grpc v1.29.1/go.mod h1:itym6AZVZYACWQqET3MqgPpjcuV5QH3BxFS3IjizoKk= google.golang.org/grpc v1.33.1/go.mod h1:fr5YgcSWrqhRRxogOsw7RzIpsmvOZ6IcH4kBYTpR3n0= -google.golang.org/grpc v1.66.0 h1:DibZuoBznOxbDQxRINckZcUvnCEvrW9pcWIE2yF9r1c= -google.golang.org/grpc v1.66.0/go.mod h1:s3/l6xSSCURdVfAnL+TqCNMyTDAGN6+lZeVxnZR128Y= +google.golang.org/grpc v1.67.1 h1:zWnc1Vrcno+lHZCOofnIMvycFcc0QRGIzm9dhnDX68E= +google.golang.org/grpc v1.67.1/go.mod h1:1gLDyUQU7CTLJI90u3nXZ9ekeghjeM7pTDZlqFNg2AA= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= diff --git a/pkg/state/impl/etcd/etcd.go b/pkg/state/impl/etcd/etcd.go index e51f940..2f21ac0 100644 --- a/pkg/state/impl/etcd/etcd.go +++ b/pkg/state/impl/etcd/etcd.go @@ -7,6 +7,7 @@ package etcd import ( "context" + "encoding/binary" "fmt" "sort" "strconv" @@ -339,9 +340,21 @@ func (st *State) Destroy(ctx context.Context, resourcePointer resource.Pointer, return fmt.Errorf("failed to destroy: %w", ErrVersionConflict(resourcePointer, etcdVersion, foundVersion)) } +func encodeBookmark(revision int64) state.Bookmark { + return binary.BigEndian.AppendUint64(nil, uint64(revision)) +} + +func decodeBookmark(bookmark state.Bookmark) (int64, error) { + if len(bookmark) != 8 { + return 0, fmt.Errorf("invalid bookmark length: %d", len(bookmark)) + } + + return int64(binary.BigEndian.Uint64(bookmark)), nil +} + // Watch a resource. // -//nolint:gocognit +//nolint:gocognit,gocyclo,cyclop func (st *State) Watch(ctx context.Context, resourcePointer resource.Pointer, ch chan<- state.Event, opts ...state.WatchOption) error { ctx = st.clearIncomingContext(ctx) @@ -351,44 +364,53 @@ func (st *State) Watch(ctx context.Context, resourcePointer resource.Pointer, ch opt(&options) } - if options.TailEvents > 0 { - return fmt.Errorf("failed to watch: %w", ErrUnsupported("tailEvents")) - } - etcdKey := st.etcdKeyFromPointer(resourcePointer) var ( - curResource resource.Resource - err error + revision int64 + initialEvent state.Event ) - getResp, err := st.cli.Get(ctx, etcdKey) - if err != nil { - return fmt.Errorf("etcd call failed on watch %q: %w", resourcePointer, err) - } + switch { + case options.TailEvents > 0: + return fmt.Errorf("failed to watch: %w", ErrUnsupported("tailEvents")) + case options.StartFromBookmark != nil: + var err error - var initialEvent state.Event + revision, err = decodeBookmark(options.StartFromBookmark) + if err != nil { + return fmt.Errorf("failed to watch %q: %w", resourcePointer, err) + } + default: + var curResource resource.Resource - if len(getResp.Kvs) > 0 { - curResource, err = st.unmarshalResource(getResp.Kvs[0]) + getResp, err := st.cli.Get(ctx, etcdKey) if err != nil { - return fmt.Errorf("failed to unmarshal on watch %q: %w", resourcePointer, err) + return fmt.Errorf("etcd call failed on watch %q: %w", resourcePointer, err) } - initialEvent.Resource = curResource - initialEvent.Type = state.Created - } else { - initialEvent.Resource = resource.NewTombstone( - resource.NewMetadata( - resourcePointer.Namespace(), - resourcePointer.Type(), - resourcePointer.ID(), - resource.VersionUndefined, - )) - initialEvent.Type = state.Destroyed - } + revision = getResp.Header.Revision + initialEvent.Bookmark = encodeBookmark(getResp.Header.Revision) - revision := getResp.Header.Revision + if len(getResp.Kvs) > 0 { + curResource, err = st.unmarshalResource(getResp.Kvs[0]) + if err != nil { + return fmt.Errorf("failed to unmarshal on watch %q: %w", resourcePointer, err) + } + + initialEvent.Resource = curResource + initialEvent.Type = state.Created + } else { + initialEvent.Resource = resource.NewTombstone( + resource.NewMetadata( + resourcePointer.Namespace(), + resourcePointer.Type(), + resourcePointer.ID(), + resource.VersionUndefined, + )) + initialEvent.Type = state.Destroyed + } + } // wrap the context to make sure Watch is aborted if the loop terminates ctx, cancel := context.WithCancel(ctx) @@ -404,8 +426,10 @@ func (st *State) Watch(ctx context.Context, resourcePointer resource.Pointer, ch }() defer cancel() - if !channel.SendWithContext(ctx, ch, initialEvent) { - return + if initialEvent.Resource != nil { + if !channel.SendWithContext(ctx, ch, initialEvent) { + return + } } for { @@ -490,17 +514,26 @@ func (st *State) watchKind(ctx context.Context, resourceKind resource.Kind, sing return options.LabelQueries.Matches(*res.Metadata().Labels()) && options.IDQuery.Matches(*res.Metadata()) } - if options.TailEvents > 0 { - return fmt.Errorf("failed to %s: %w", opName, ErrUnsupported("tailEvents")) - } - etcdKey := st.etcdKeyPrefixFromKind(resourceKind) - var bootstrapList []resource.Resource + var ( + bootstrapList []resource.Resource + revision int64 + ) - var revision int64 + switch { + case options.TailEvents > 0: + return fmt.Errorf("failed to %s: %w", opName, ErrUnsupported("tailEvents")) + case options.StartFromBookmark != nil && options.BootstrapContents: + return fmt.Errorf("failed to %s: %w", opName, ErrUnsupported("startFromBookmark and bootstrapContents")) + case options.StartFromBookmark != nil: + var err error - if options.BootstrapContents { + revision, err = decodeBookmark(options.StartFromBookmark) + if err != nil { + return fmt.Errorf("failed to %s %q: %w", opName, resourceKind, err) + } + case options.BootstrapContents: getResp, err := st.cli.Get(ctx, etcdKey, clientv3.WithPrefix()) if err != nil { return fmt.Errorf("etcd call failed on %s %q: %w", opName, resourceKind, err) @@ -559,6 +592,7 @@ func (st *State) watchKind(ctx context.Context, resourceKind resource.Kind, sing state.Event{ Type: state.Bootstrapped, Resource: resource.NewTombstone(resource.NewMetadata(resourceKind.Namespace(), resourceKind.Type(), "", resource.VersionUndefined)), + Bookmark: encodeBookmark(revision), }, ) { return @@ -574,6 +608,7 @@ func (st *State) watchKind(ctx context.Context, resourceKind resource.Kind, sing events = append(events, state.Event{ Type: state.Bootstrapped, Resource: resource.NewTombstone(resource.NewMetadata(resourceKind.Namespace(), resourceKind.Type(), "", resource.VersionUndefined)), + Bookmark: encodeBookmark(revision), }) if !channel.SendWithContext(ctx, aggCh, events) { @@ -741,18 +776,23 @@ func (st *State) convertEvent(etcdEvent *clientv3.Event) (state.Event, error) { return state.Event{ Resource: previous, Type: state.Destroyed, + Bookmark: encodeBookmark(etcdEvent.Kv.ModRevision), }, nil } eventType := state.Updated + bookmark := encodeBookmark(etcdEvent.Kv.ModRevision) + if etcdEvent.IsCreate() { eventType = state.Created + bookmark = encodeBookmark(etcdEvent.Kv.CreateRevision) } return state.Event{ Resource: current, Old: previous, Type: eventType, + Bookmark: bookmark, }, nil }