Skip to content

Commit

Permalink
feat: implement 'WatchKindAggregated' for etcd state
Browse files Browse the repository at this point in the history
This mirrors cosi-project/runtime#237 for `etcd`-backed implementation.

Signed-off-by: Andrey Smirnov <[email protected]>
  • Loading branch information
smira committed Mar 20, 2023
1 parent 2705e42 commit f219016
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 37 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/cosi-project/state-etcd
go 1.20

require (
github.com/cosi-project/runtime v0.3.0-alpha.8.0.20230313144548-4fd36fe6ac81
github.com/cosi-project/runtime v0.3.0-alpha.10
github.com/siderolabs/gen v0.4.3
github.com/stretchr/testify v1.8.2
go.etcd.io/etcd/api/v3 v3.5.7
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ github.com/coreos/go-semver v0.3.0 h1:wkHLiw0WNATZnSG7epLsujiMCgPAc9xhjJ4tgnAxmf
github.com/coreos/go-semver v0.3.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk=
github.com/coreos/go-systemd/v22 v22.3.2 h1:D9/bQk5vlXQFZ6Kwuu6zaiXJ9oTPe68++AzAJc1DzSI=
github.com/coreos/go-systemd/v22 v22.3.2/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc=
github.com/cosi-project/runtime v0.3.0-alpha.8.0.20230313144548-4fd36fe6ac81 h1:A9pBfkoULcxkN3eW2DldGMw/VivG7IDsW/XeNkhWB/c=
github.com/cosi-project/runtime v0.3.0-alpha.8.0.20230313144548-4fd36fe6ac81/go.mod h1:jO9Dp1D+mSRgQGhrbI+PhXFGrm6H+/IXYVSPF7sucpg=
github.com/cosi-project/runtime v0.3.0-alpha.10 h1:WMk620imAuRTDeEPkwyAXu6lR59FdJNq9CpaWu5lsU0=
github.com/cosi-project/runtime v0.3.0-alpha.10/go.mod h1:BBFzt+ANf4BSlVGIbemVuz+Hql8F3tveWaWoErUi0lY=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
Expand Down
116 changes: 82 additions & 34 deletions pkg/state/impl/etcd/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/cosi-project/runtime/pkg/state"
"github.com/cosi-project/runtime/pkg/state/impl/store"
"github.com/siderolabs/gen/channel"
"github.com/siderolabs/gen/slices"
"go.etcd.io/etcd/api/v3/mvccpb"
clientv3 "go.etcd.io/etcd/client/v3"

Expand Down Expand Up @@ -450,9 +451,17 @@ func (st *State) Watch(ctx context.Context, resourcePointer resource.Pointer, ch
}

// WatchKind all resources by type.
//
//nolint:gocyclo,cyclop,gocognit
func (st *State) WatchKind(ctx context.Context, resourceKind resource.Kind, ch chan<- state.Event, opts ...state.WatchKindOption) error {
return st.watchKind(ctx, resourceKind, ch, nil, opts...)
}

// WatchKindAggregated all resources by type.
func (st *State) WatchKindAggregated(ctx context.Context, resourceKind resource.Kind, ch chan<- []state.Event, opts ...state.WatchKindOption) error {
return st.watchKind(ctx, resourceKind, nil, ch, opts...)
}

//nolint:gocyclo,cyclop,gocognit,maintidx
func (st *State) watchKind(ctx context.Context, resourceKind resource.Kind, singleCh chan<- state.Event, aggCh chan<- []state.Event, opts ...state.WatchKindOption) error {
ctx = st.clearIncomingContext(ctx)

var options state.WatchKindOptions
Expand Down Expand Up @@ -510,28 +519,40 @@ func (st *State) WatchKind(ctx context.Context, resourceKind resource.Kind, ch c
go func() {
defer cancel()

// send initial contents if they were captured
for _, res := range bootstrapList {
if !channel.SendWithContext(ctx, ch,
state.Event{
Type: state.Created,
Resource: res,
},
) {
return
}
}
if options.BootstrapContents {
switch {
case singleCh != nil:
for _, res := range bootstrapList {
if !channel.SendWithContext(ctx, singleCh,
state.Event{
Type: state.Created,
Resource: res,
},
) {
return
}
}

bootstrapList = nil
if !channel.SendWithContext(ctx, singleCh, state.Event{Type: state.Bootstrapped}) {
return
}
case aggCh != nil:
events := slices.Map(bootstrapList, func(r resource.Resource) state.Event {
return state.Event{
Type: state.Created,
Resource: r,
}
})

if options.BootstrapContents {
if !channel.SendWithContext(ctx, ch,
state.Event{
Type: state.Bootstrapped,
},
) {
return
events = append(events, state.Event{Type: state.Bootstrapped})

if !channel.SendWithContext(ctx, aggCh, events) {
return
}
}

// make the list nil so that it gets GC'ed, we don't need it anymore after this point
bootstrapList = nil
}

for {
Expand All @@ -551,12 +572,17 @@ func (st *State) WatchKind(ctx context.Context, resourceKind resource.Kind, ch c
}

if watchResponse.Err() != nil {
channel.SendWithContext(ctx, ch,
state.Event{
Type: state.Errored,
Error: watchResponse.Err(),
},
)
watchErrorEvent := state.Event{
Type: state.Errored,
Error: watchResponse.Err(),
}

switch {
case singleCh != nil:
channel.SendWithContext(ctx, singleCh, watchErrorEvent)
case aggCh != nil:
channel.SendWithContext(ctx, aggCh, []state.Event{watchErrorEvent})
}

return
}
Expand All @@ -565,6 +591,8 @@ func (st *State) WatchKind(ctx context.Context, resourceKind resource.Kind, ch c
return
}

events := make([]state.Event, 0, len(watchResponse.Events))

for _, etcdEvent := range watchResponse.Events {
// watch event might come for a revision which was already sent in the bootstrapped set, ignore it
if etcdEvent.Kv != nil && etcdEvent.Kv.ModRevision <= revision {
Expand All @@ -573,12 +601,17 @@ func (st *State) WatchKind(ctx context.Context, resourceKind resource.Kind, ch c

event, err := st.convertEvent(etcdEvent)
if err != nil {
channel.SendWithContext(ctx, ch,
state.Event{
Type: state.Errored,
Error: err,
},
)
convertErrorEvent := state.Event{
Type: state.Errored,
Error: err,
}

switch {
case singleCh != nil:
channel.SendWithContext(ctx, singleCh, convertErrorEvent)
case aggCh != nil:
channel.SendWithContext(ctx, aggCh, []state.Event{convertErrorEvent})
}

return
}
Expand Down Expand Up @@ -611,9 +644,24 @@ func (st *State) WatchKind(ctx context.Context, resourceKind resource.Kind, ch c
panic("should never be reached")
}

if !channel.SendWithContext(ctx, ch, event) {
events = append(events, event)
}

if len(events) == 0 {
continue
}

switch {
case aggCh != nil:
if !channel.SendWithContext(ctx, aggCh, events) {
return
}
case singleCh != nil:
for _, event := range events {
if !channel.SendWithContext(ctx, singleCh, event) {
return
}
}
}
}
}()
Expand Down

0 comments on commit f219016

Please sign in to comment.