Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rework Sotw Cache interface #6

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 30 additions & 3 deletions pkg/cache/v3/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"google.golang.org/protobuf/types/known/durationpb"

"github.com/envoyproxy/go-control-plane/pkg/cache/types"
"github.com/envoyproxy/go-control-plane/pkg/server/stream/v3"

discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
)
Expand All @@ -37,6 +36,34 @@ type Request = discovery.DiscoveryRequest
// DeltaRequest is an alias for the delta discovery request type.
type DeltaRequest = discovery.DeltaDiscoveryRequest

// Subscription stores the server view of the client state for a given resource type.
// This allows proper implementation of stateful aspects of the protocol (e.g. returning only some updated resources).
// Though the methods may return mutable parts of the state for performance reasons,
// the cache is expected to consider this state as immutable and thread safe between a watch creation and its cancellation.
type Subscription interface {
// ReturnedResources returns a list of resources that clients have ACK'd and their associated version.
// The versions are:
// - delta protocol: version of the specific resource set in the response
// - sotw protocol: version of the global response when the resource was last ACKed
ReturnedResources() map[string]string

// SubscribedResources returns the list of resources currently subscribed to by the client for the type.
// For delta it keeps track of subscription updates across requests
// For sotw it is a normalized view of the last request resources
SubscribedResources() map[string]struct{}

// IsWildcard returns whether the client has a wildcard watch.
// This considers subtleties related to the current migration of wildcard definitions within the protocol.
// More details on the behavior of wildcard are present at https://www.envoyproxy.io/docs/envoy/latest/api-docs/xds_protocol#how-the-client-specifies-what-resources-to-return
IsWildcard() bool
atollena marked this conversation as resolved.
Show resolved Hide resolved

// WatchesResources returns whether at least one of the resources provided is currently being watched by the subscription.
// It is currently only applicable to delta-xds.
// If the request is wildcard, it will always return true,
// otherwise it will compare the provided resources to the list of resources currently subscribed
WatchesResources(resourceNames map[string]struct{}) bool
}

// ConfigWatcher requests watches for configuration resources by a node, last
// applied version identifier, and resource names hint. The watch should send
// the responses when they are ready. The watch can be canceled by the
Expand All @@ -54,7 +81,7 @@ type ConfigWatcher interface {
//
// Cancel is an optional function to release resources in the producer. If
// provided, the consumer may call this function multiple times.
CreateWatch(*Request, stream.StreamState, chan Response) (cancel func())
CreateWatch(*Request, Subscription, chan Response) (cancel func(), err error)

// CreateDeltaWatch returns a new open incremental xDS watch.
// This is the entrypoint to propagate configuration changes the
Expand All @@ -66,7 +93,7 @@ type ConfigWatcher interface {
//
// Cancel is an optional function to release resources in the producer. If
// provided, the consumer may call this function multiple times.
CreateDeltaWatch(*DeltaRequest, stream.StreamState, chan DeltaResponse) (cancel func())
CreateDeltaWatch(*DeltaRequest, Subscription, chan DeltaResponse) (cancel func(), err error)
}

// ConfigFetcher fetches configuration resources from cache
Expand Down
15 changes: 7 additions & 8 deletions pkg/cache/v3/delta.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"context"

"github.com/envoyproxy/go-control-plane/pkg/cache/types"
"github.com/envoyproxy/go-control-plane/pkg/server/stream/v3"
)

// groups together resource-related arguments for the createDeltaResponse function
Expand All @@ -28,7 +27,7 @@ type resourceContainer struct {
systemVersion string
}

func createDeltaResponse(ctx context.Context, req *DeltaRequest, state stream.StreamState, resources resourceContainer) *RawDeltaResponse {
func createDeltaResponse(ctx context.Context, req *DeltaRequest, state Subscription, resources resourceContainer) *RawDeltaResponse {
// variables to build our response with
var nextVersionMap map[string]string
var filtered []types.Resource
Expand All @@ -37,7 +36,7 @@ func createDeltaResponse(ctx context.Context, req *DeltaRequest, state stream.St
// If we are handling a wildcard request, we want to respond with all resources
switch {
case state.IsWildcard():
if len(state.GetResourceVersions()) == 0 {
if len(state.ReturnedResources()) == 0 {
filtered = make([]types.Resource, 0, len(resources.resourceMap))
}
nextVersionMap = make(map[string]string, len(resources.resourceMap))
Expand All @@ -46,25 +45,25 @@ func createDeltaResponse(ctx context.Context, req *DeltaRequest, state stream.St
// we can just set it here to be used for comparison later
version := resources.versionMap[name]
nextVersionMap[name] = version
prevVersion, found := state.GetResourceVersions()[name]
prevVersion, found := state.ReturnedResources()[name]
if !found || (prevVersion != version) {
filtered = append(filtered, r)
}
}

// Compute resources for removal
// The resource version can be set to "" here to trigger a removal even if never returned before
for name := range state.GetResourceVersions() {
for name := range state.ReturnedResources() {
if _, ok := resources.resourceMap[name]; !ok {
toRemove = append(toRemove, name)
}
}
default:
nextVersionMap = make(map[string]string, len(state.GetSubscribedResourceNames()))
nextVersionMap = make(map[string]string, len(state.SubscribedResources()))
// state.GetResourceVersions() may include resources no longer subscribed
// In the current code this gets silently cleaned when updating the version map
for name := range state.GetSubscribedResourceNames() {
prevVersion, found := state.GetResourceVersions()[name]
for name := range state.SubscribedResources() {
prevVersion, found := state.ReturnedResources()[name]
if r, ok := resources.resourceMap[name]; ok {
nextVersion := resources.versionMap[name]
if prevVersion != nextVersion {
Expand Down
103 changes: 56 additions & 47 deletions pkg/cache/v3/delta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package cache_test
import (
"context"
"fmt"
"reflect"
"testing"
"time"

Expand Down Expand Up @@ -36,13 +35,15 @@ func TestSnapshotCacheDeltaWatch(t *testing.T) {
// Make our initial request as a wildcard to get all resources and make sure the wildcard requesting works as intended
for _, typ := range testTypes {
watches[typ] = make(chan cache.DeltaResponse, 1)
c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{
sub := stream.NewSubscription(true, nil)
_, err := c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{
Node: &core.Node{
Id: "node",
},
TypeUrl: typ,
ResourceNamesSubscribe: names[typ],
}, stream.NewStreamState(true, nil), watches[typ])
}, sub, watches[typ])
require.NoError(t, err)
}

if err := c.SetSnapshot(context.Background(), key, fixture.snapshot()); err != nil {
Expand All @@ -68,17 +69,20 @@ func TestSnapshotCacheDeltaWatch(t *testing.T) {
// all resources as well as individual resource removals
for _, typ := range testTypes {
watches[typ] = make(chan cache.DeltaResponse, 1)
state := stream.NewStreamState(false, versionMap[typ])
sub := stream.NewSubscription(false, versionMap[typ])
resources := []string{}
for resource := range versionMap[typ] {
state.GetSubscribedResourceNames()[resource] = struct{}{}
resources = append(resources, resource)
}
c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{
sub.SetResourceSubscription(resources)
_, err := c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{
Node: &core.Node{
Id: "node",
},
TypeUrl: typ,
ResourceNamesSubscribe: names[typ],
}, state, watches[typ])
}, sub, watches[typ])
require.NoError(t, err)
}

if count := c.GetStatusInfo(key).GetNumDeltaWatches(); count != len(testTypes) {
Expand Down Expand Up @@ -111,36 +115,40 @@ func TestSnapshotCacheDeltaWatch(t *testing.T) {
func TestDeltaRemoveResources(t *testing.T) {
c := cache.NewSnapshotCache(false, group{}, logger{t: t})
watches := make(map[string]chan cache.DeltaResponse)
streams := make(map[string]*stream.StreamState)
subs := make(map[string]*stream.Subscription)

// At this stage the cache is empty, so a watch is opened
for _, typ := range testTypes {
watches[typ] = make(chan cache.DeltaResponse, 1)
state := stream.NewStreamState(true, make(map[string]string))
streams[typ] = &state
sub := stream.NewSubscription(true, make(map[string]string))
subs[typ] = &sub
// We don't specify any resource name subscriptions here because we want to make sure we test wildcard
// functionality. This means we should receive all resources back without requesting a subscription by name.
c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{
_, err := c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{
Node: &core.Node{
Id: "node",
},
TypeUrl: typ,
}, *streams[typ], watches[typ])
}, *subs[typ], watches[typ])
require.NoError(t, err)
}

if err := c.SetSnapshot(context.Background(), key, fixture.snapshot()); err != nil {
t.Fatal(err)
}
snapshot := fixture.snapshot()
snapshot.Resources[types.Endpoint] = cache.NewResources(fixture.version, []types.Resource{
testEndpoint,
resource.MakeEndpoint("otherCluster", 8080),
})
require.NoError(t, c.SetSnapshot(context.Background(), key, snapshot))

for _, typ := range testTypes {
t.Run(typ, func(t *testing.T) {
select {
case out := <-watches[typ]:
snapshot := fixture.snapshot()
assertResourceMapEqual(t, cache.IndexRawResourcesByName(out.(*cache.RawDeltaResponse).Resources), snapshot.GetResources(typ))
nextVersionMap := out.GetNextVersionMap()
streams[typ].SetResourceVersions(nextVersionMap)
subs[typ].SetReturnedResources(nextVersionMap)
case <-time.After(time.Second):
t.Fatal("failed to receive a snapshot response")
require.Fail(t, "failed to receive a snapshot response")
}
})
}
Expand All @@ -149,39 +157,35 @@ func TestDeltaRemoveResources(t *testing.T) {
// test the removal of certain resources from a partial snapshot
for _, typ := range testTypes {
watches[typ] = make(chan cache.DeltaResponse, 1)
c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{
_, err := c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{
Node: &core.Node{
Id: "node",
},
TypeUrl: typ,
}, *streams[typ], watches[typ])
TypeUrl: typ,
ResponseNonce: "nonce",
}, *subs[typ], watches[typ])
require.NoError(t, err)
}

if count := c.GetStatusInfo(key).GetNumDeltaWatches(); count != len(testTypes) {
t.Errorf("watches should be created for the latest version, saw %d watches expected %d", count, len(testTypes))
}
assert.Equal(t, len(testTypes), c.GetStatusInfo(key).GetNumDeltaWatches(), "watches should be created for the latest version")

// set a partially versioned snapshot with no endpoints
// set a partially versioned snapshot with only one endpoint
snapshot2 := fixture.snapshot()
snapshot2.Resources[types.Endpoint] = cache.NewResources(fixture.version2, []types.Resource{})
if err := c.SetSnapshot(context.Background(), key, snapshot2); err != nil {
t.Fatal(err)
}
snapshot2.Resources[types.Endpoint] = cache.NewResources(fixture.version2, []types.Resource{
testEndpoint, // this cluster is not changed, we do not expect it back in "resources"
})
require.NoError(t, c.SetSnapshot(context.Background(), key, snapshot2))

// validate response for endpoints
select {
case out := <-watches[testTypes[0]]:
snapshot2 := fixture.snapshot()
snapshot2.Resources[types.Endpoint] = cache.NewResources(fixture.version2, []types.Resource{})
assertResourceMapEqual(t, cache.IndexRawResourcesByName(out.(*cache.RawDeltaResponse).Resources), snapshot2.GetResources(rsrc.EndpointType))
assert.Empty(t, out.(*cache.RawDeltaResponse).Resources)
assert.Equal(t, []string{"otherCluster"}, out.(*cache.RawDeltaResponse).RemovedResources)
nextVersionMap := out.GetNextVersionMap()

// make sure the version maps are different since we no longer are tracking any endpoint resources
if reflect.DeepEqual(streams[testTypes[0]].GetResourceVersions(), nextVersionMap) {
t.Fatalf("versionMap for the endpoint resource type did not change, received: %v, instead of an empty map", nextVersionMap)
}
assert.NotEqual(t, nextVersionMap, subs[testTypes[0]].ReturnedResources(), "versionMap for the endpoint resource type did not change")
case <-time.After(time.Second):
t.Fatal("failed to receive snapshot response")
assert.Fail(t, "failed to receive snapshot response")
}
}

Expand All @@ -204,13 +208,15 @@ func TestConcurrentSetDeltaWatch(t *testing.T) {
t.Fatalf("snapshot failed: %s", err)
}
} else {
cancel := c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{
sub := stream.NewSubscription(false, make(map[string]string))
cancel, err := c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{
Node: &core.Node{
Id: id,
},
TypeUrl: rsrc.EndpointType,
ResourceNamesSubscribe: []string{clusterName},
}, stream.NewStreamState(false, make(map[string]string)), responses)
}, sub, responses)
require.NoError(t, err)

defer cancel()
}
Expand All @@ -226,22 +232,23 @@ func TestSnapshotDeltaCacheWatchTimeout(t *testing.T) {

// Create a non-buffered channel that will block sends.
watchCh := make(chan cache.DeltaResponse)
state := stream.NewStreamState(false, nil)
state.SetSubscribedResourceNames(map[string]struct{}{names[rsrc.EndpointType][0]: {}})
c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{
sub := stream.NewSubscription(false, nil)
sub.UpdateResourceSubscriptions([]string{names[rsrc.EndpointType][0]}, nil)
_, err := c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{
Node: &core.Node{
Id: key,
},
TypeUrl: rsrc.EndpointType,
ResourceNamesSubscribe: names[rsrc.EndpointType],
}, state, watchCh)
}, sub, watchCh)
require.NoError(t, err)

// The first time we set the snapshot without consuming from the blocking channel, so this should time out.
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
defer cancel()

err := c.SetSnapshot(ctx, key, fixture.snapshot())
require.EqualError(t, err, context.Canceled.Error())
err = c.SetSnapshot(ctx, key, fixture.snapshot())
assert.EqualError(t, err, context.Canceled.Error())

// Now reset the snapshot with a consuming channel. This verifies that if setting the snapshot fails,
// we can retry by setting the same snapshot. In other words, we keep the watch open even if we failed
Expand Down Expand Up @@ -270,13 +277,15 @@ func TestSnapshotCacheDeltaWatchCancel(t *testing.T) {
c := cache.NewSnapshotCache(true, group{}, logger{t: t})
for _, typ := range testTypes {
responses := make(chan cache.DeltaResponse, 1)
cancel := c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{
sub := stream.NewSubscription(false, make(map[string]string))
cancel, err := c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{
Node: &core.Node{
Id: key,
},
TypeUrl: typ,
ResourceNamesSubscribe: names[typ],
}, stream.NewStreamState(false, make(map[string]string)), responses)
}, sub, responses)
require.NoError(t, err)

// Cancel the watch
cancel()
Expand Down
Loading