Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rework Sotw Cache interface #7

Closed
Prev Previous commit
Next Next commit
Improve comments based on PR review on envoyproxy#583
Rename KnownResources to ACKedResources to better reflect the change

Signed-off-by: Valerian Roche <valerian.roche@datadoghq.com>
  • Loading branch information
valerian-roche committed Jan 5, 2024

Verified

This commit was signed with the committer’s verified signature. The key has been revoked.
valerian-roche Valerian Roche
commit cefeb71c3e57f5639e272f967d96f46e44c0fba3
10 changes: 5 additions & 5 deletions pkg/cache/v3/cache.go
Original file line number Diff line number Diff line change
@@ -36,16 +36,16 @@ type Request = discovery.DiscoveryRequest
// DeltaRequest is an alias for the delta discovery request type.
type DeltaRequest = discovery.DeltaDiscoveryRequest

// SubscriptionState provides additional data on the client knowledge for the type matching the request
// This allows proper implementation of stateful aspects of the protocol (e.g. returning only some updated resources)
// SubscriptionState stores the server view of the client state for a given resource type.
// This allows proper implementation of stateful aspects of the protocol (e.g. returning only some updated resources).
// Though the methods may return mutable parts of the state for performance reasons,
// the cache is expected to consider this state as immutable and thread safe between a watch creation and its cancellation
// the cache is expected to consider this state as immutable and thread safe between a watch creation and its cancellation.
type SubscriptionState interface {
// GetKnownResources returns a list of resources that the client has ACK'd and their associated version.
// GetACKedResources returns a list of resources that the client has ACK'd and their associated version.
// The versions are:
// - delta protocol: version of the specific resource set in the response
// - sotw protocol: version of the global response when the resource was last ACKed
GetKnownResources() map[string]string
GetACKedResources() map[string]string

// GetSubscribedResources returns the list of resources currently subscribed to by the client for the type.
// For delta it keeps track of subscription updates across requests
8 changes: 4 additions & 4 deletions pkg/cache/v3/delta.go
Original file line number Diff line number Diff line change
@@ -36,7 +36,7 @@ func createDeltaResponse(ctx context.Context, req *DeltaRequest, state Subscript
// If we are handling a wildcard request, we want to respond with all resources
switch {
case state.IsWildcard():
if len(state.GetKnownResources()) == 0 {
if len(state.GetACKedResources()) == 0 {
filtered = make([]types.Resource, 0, len(resources.resourceMap))
}
nextVersionMap = make(map[string]string, len(resources.resourceMap))
@@ -45,15 +45,15 @@ func createDeltaResponse(ctx context.Context, req *DeltaRequest, state Subscript
// we can just set it here to be used for comparison later
version := resources.versionMap[name]
nextVersionMap[name] = version
prevVersion, found := state.GetKnownResources()[name]
prevVersion, found := state.GetACKedResources()[name]
if !found || (prevVersion != version) {
filtered = append(filtered, r)
}
}

// Compute resources for removal
// The resource version can be set to "" here to trigger a removal even if never returned before
for name := range state.GetKnownResources() {
for name := range state.GetACKedResources() {
if _, ok := resources.resourceMap[name]; !ok {
toRemove = append(toRemove, name)
}
@@ -63,7 +63,7 @@ func createDeltaResponse(ctx context.Context, req *DeltaRequest, state Subscript
// state.GetResourceVersions() may include resources no longer subscribed
// In the current code this gets silently cleaned when updating the version map
for name := range state.GetSubscribedResources() {
prevVersion, found := state.GetKnownResources()[name]
prevVersion, found := state.GetACKedResources()[name]
if r, ok := resources.resourceMap[name]; ok {
nextVersion := resources.versionMap[name]
if prevVersion != nextVersion {
4 changes: 2 additions & 2 deletions pkg/cache/v3/delta_test.go
Original file line number Diff line number Diff line change
@@ -144,7 +144,7 @@ func TestDeltaRemoveResources(t *testing.T) {
case out := <-watches[typ]:
assertResourceMapEqual(t, cache.IndexRawResourcesByName(out.(*cache.RawDeltaResponse).Resources), snapshot.GetResources(typ))
nextVersionMap := out.GetNextVersionMap()
streams[typ].SetKnownResources(nextVersionMap)
streams[typ].SetACKedResources(nextVersionMap)
case <-time.After(time.Second):
require.Fail(t, "failed to receive a snapshot response")
}
@@ -181,7 +181,7 @@ func TestDeltaRemoveResources(t *testing.T) {
assert.Equal(t, []string{"otherCluster"}, out.(*cache.RawDeltaResponse).RemovedResources)
nextVersionMap := out.GetNextVersionMap()
// make sure the version maps are different since we no longer are tracking any endpoint resources
assert.NotEqual(t, nextVersionMap, streams[testTypes[0]].GetKnownResources(), "versionMap for the endpoint resource type did not change")
assert.NotEqual(t, nextVersionMap, streams[testTypes[0]].GetACKedResources(), "versionMap for the endpoint resource type did not change")
case <-time.After(time.Second):
assert.Fail(t, "failed to receive snapshot response")
}
10 changes: 5 additions & 5 deletions pkg/cache/v3/linear_test.go
Original file line number Diff line number Diff line change
@@ -195,7 +195,7 @@ func createWildcardDeltaWatch(c *LinearCache, w chan DeltaResponse) error {
return err
}
resp := <-w
state.SetKnownResources(resp.GetNextVersionMap())
state.SetACKedResources(resp.GetNextVersionMap())
_, err := c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, &state, w) // Ensure the watch is set properly with cache values
return err
}
@@ -674,7 +674,7 @@ func TestLinearDeltaMultiResourceUpdates(t *testing.T) {
validateDeltaResponse(t, resp, []resourceInfo{{"a", hashA}, {"b", hashB}}, nil)
checkVersionMapSet(t, c)
assert.Equal(t, 2, c.NumResources())
state.SetKnownResources(resp.GetNextVersionMap())
state.SetACKedResources(resp.GetNextVersionMap())

// Multiple updates
_, err = c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, &state, w)
@@ -695,7 +695,7 @@ func TestLinearDeltaMultiResourceUpdates(t *testing.T) {
validateDeltaResponse(t, resp, []resourceInfo{{"a", hashA}, {"b", hashB}}, nil)
checkVersionMapSet(t, c)
assert.Equal(t, 2, c.NumResources())
state.SetKnownResources(resp.GetNextVersionMap())
state.SetACKedResources(resp.GetNextVersionMap())

// Update/add/delete
_, err = c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, &state, w)
@@ -715,7 +715,7 @@ func TestLinearDeltaMultiResourceUpdates(t *testing.T) {
validateDeltaResponse(t, resp, []resourceInfo{{"a", hashA}}, []string{"b"})
checkVersionMapSet(t, c)
assert.Equal(t, 2, c.NumResources())
state.SetKnownResources(resp.GetNextVersionMap())
state.SetACKedResources(resp.GetNextVersionMap())

// Re-add previously deleted watched resource
_, err = c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, &state, w)
@@ -732,7 +732,7 @@ func TestLinearDeltaMultiResourceUpdates(t *testing.T) {
validateDeltaResponse(t, resp, []resourceInfo{{"b", hashB}}, nil) // d is not watched and should not be returned
checkVersionMapSet(t, c)
assert.Equal(t, 2, c.NumResources())
state.SetKnownResources(resp.GetNextVersionMap())
state.SetACKedResources(resp.GetNextVersionMap())

// Wildcard create/update
require.NoError(t, createWildcardDeltaWatch(c, w))
2 changes: 1 addition & 1 deletion pkg/cache/v3/simple.go
Original file line number Diff line number Diff line change
@@ -408,7 +408,7 @@ func (cache *snapshotCache) CreateWatch(request *Request, clientState Subscripti
}

if exists {
knownResourceNames := clientState.GetKnownResources()
knownResourceNames := clientState.GetACKedResources()
diff := []string{}
for _, r := range request.GetResourceNames() {
if _, ok := knownResourceNames[r]; !ok {
10 changes: 5 additions & 5 deletions pkg/cache/v3/simple_test.go
Original file line number Diff line number Diff line change
@@ -149,7 +149,7 @@ func TestSnapshotCacheWithTTL(t *testing.T) {
}
// Update streamState
for _, resource := range out.GetRequest().GetResourceNames() {
streamState.GetKnownResources()[resource] = fixture.version
streamState.GetACKedResources()[resource] = fixture.version
}
case <-time.After(2 * time.Second):
t.Errorf("failed to receive snapshot response")
@@ -189,7 +189,7 @@ func TestSnapshotCacheWithTTL(t *testing.T) {
updatesByType[typ]++

for _, resource := range out.GetRequest().GetResourceNames() {
streamState.GetKnownResources()[resource] = fixture.version
streamState.GetACKedResources()[resource] = fixture.version
}
case <-end:
cancel()
@@ -321,7 +321,7 @@ func TestSnapshotCacheWatch(t *testing.T) {
t.Errorf("get resources %v, want %v", out.(*cache.RawResponse).Resources, snapshot.GetResourcesAndTTL(typ))
}
for _, resource := range out.GetRequest().GetResourceNames() {
streamState.GetKnownResources()[resource] = fixture.version
streamState.GetACKedResources()[resource] = fixture.version
}
case <-time.After(time.Second):
t.Fatal("failed to receive snapshot response")
@@ -501,7 +501,7 @@ func TestSnapshotCreateWatchWithResourcePreviouslyNotRequested(t *testing.T) {
// Request additional resource with name=clusterName2 for same version
go func() {
state := stream.NewSubscriptionState(false, map[string]string{})
state.SetKnownResources(map[string]string{clusterName: fixture.version})
state.SetACKedResources(map[string]string{clusterName: fixture.version})
_, err := c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: rsrc.EndpointType, VersionInfo: fixture.version,
ResourceNames: []string{clusterName, clusterName2}}, &state, watch)
require.NoError(t, err)
@@ -521,7 +521,7 @@ func TestSnapshotCreateWatchWithResourcePreviouslyNotRequested(t *testing.T) {

// Repeat request for with same version and make sure a watch is created
state := stream.NewSubscriptionState(false, map[string]string{})
state.SetKnownResources(map[string]string{clusterName: fixture.version, clusterName2: fixture.version})
state.SetACKedResources(map[string]string{clusterName: fixture.version, clusterName2: fixture.version})
if cancel, err := c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: rsrc.EndpointType, VersionInfo: fixture.version,
ResourceNames: []string{clusterName, clusterName2}}, &state, watch); cancel == nil {
t.Fatal("Should create a watch")
4 changes: 2 additions & 2 deletions pkg/server/delta/v3/server.go
Original file line number Diff line number Diff line change
@@ -118,7 +118,7 @@ func (s *server) processDelta(str stream.DeltaStream, reqCh <-chan *discovery.De
watch := watches.deltaWatches[typ]
watch.nonce = nonce

watch.state.SetKnownResources(resp.GetNextVersionMap())
watch.state.SetACKedResources(resp.GetNextVersionMap())
watches.deltaWatches[typ] = watch
return nil
}
@@ -285,7 +285,7 @@ func (s *server) unsubscribe(resources []string, streamState *stream.Subscriptio
// To achieve that, we mark the resource as having been returned with an empty version. While creating the response, the cache will either:
// * detect the version change, and return the resource (as an update)
// * detect the resource deletion, and set it as removed in the response
streamState.GetKnownResources()[resource] = ""
streamState.GetACKedResources()[resource] = ""
}
delete(sv, resource)
}
6 changes: 3 additions & 3 deletions pkg/server/sotw/v3/ads.go
Original file line number Diff line number Diff line change
@@ -103,7 +103,7 @@ func (s *server) processADS(sw *streamWrapper, reqCh chan *discovery.DiscoveryRe
}

typeURL := req.GetTypeUrl()
streamState, ok := sw.streamStates[typeURL]
streamState, ok := sw.streamState[typeURL]
if !ok {
// Supports legacy wildcard mode
// Wildcard will be set to true if no resource is set
@@ -114,7 +114,7 @@ func (s *server) processADS(sw *streamWrapper, reqCh chan *discovery.DiscoveryRe
if lastResponse, ok := sw.lastDiscoveryResponses[typeURL]; ok {
if lastResponse.nonce == "" || lastResponse.nonce == nonce {
// Let's record Resource names that a client has received.
streamState.SetKnownResources(lastResponse.resources)
streamState.SetACKedResources(lastResponse.resources)
}
}

@@ -157,7 +157,7 @@ func (s *server) processADS(sw *streamWrapper, reqCh chan *discovery.DiscoveryRe
})
}

sw.streamStates[req.TypeUrl] = streamState
sw.streamState[req.TypeUrl] = streamState
}
}
}
2 changes: 1 addition & 1 deletion pkg/server/sotw/v3/server.go
Original file line number Diff line number Diff line change
@@ -91,7 +91,7 @@ type streamWrapper struct {

// The below fields are used for tracking resource
// cache state and should be maintained per stream.
streamStates map[string]stream.SubscriptionState
streamState map[string]stream.SubscriptionState
lastDiscoveryResponses map[string]lastDiscoveryResponse
}

8 changes: 4 additions & 4 deletions pkg/server/sotw/v3/xds.go
Original file line number Diff line number Diff line change
@@ -27,7 +27,7 @@ func (s *server) process(str stream.Stream, reqCh chan *discovery.DiscoveryReque

// a collection of stack allocated watches per request type.
watches: newWatches(),
streamStates: make(map[string]stream.SubscriptionState),
streamState: make(map[string]stream.SubscriptionState),
lastDiscoveryResponses: make(map[string]lastDiscoveryResponse),
}

@@ -110,7 +110,7 @@ func (s *server) process(str stream.Stream, reqCh chan *discovery.DiscoveryReque
req.TypeUrl = defaultTypeURL
}

streamState := sw.streamStates[req.TypeUrl]
streamState := sw.streamState[req.TypeUrl]

if s.callbacks != nil {
if err := s.callbacks.OnStreamRequest(sw.ID, req); err != nil {
@@ -121,7 +121,7 @@ func (s *server) process(str stream.Stream, reqCh chan *discovery.DiscoveryReque
if lastResponse, ok := sw.lastDiscoveryResponses[req.GetTypeUrl()]; ok {
if lastResponse.nonce == "" || lastResponse.nonce == nonce {
// Let's record Resource names that a client has received.
streamState.SetKnownResources(lastResponse.resources)
streamState.SetACKedResources(lastResponse.resources)
}
}

@@ -157,7 +157,7 @@ func (s *server) process(str stream.Stream, reqCh chan *discovery.DiscoveryReque
})
}

sw.streamStates[req.TypeUrl] = streamState
sw.streamState[req.TypeUrl] = streamState

// Recompute the dynamic select cases for this stream.
sw.watches.recompute(s.ctx, reqCh)
27 changes: 13 additions & 14 deletions pkg/server/stream/v3/subscription.go
Original file line number Diff line number Diff line change
@@ -1,29 +1,28 @@
package stream

// SubscriptionState will keep track of a resource subscription on a stream.
// SubscriptionState stores the server view of a given type subscription in a stream.
type SubscriptionState struct {
// wildcard is set if the subscription currently has a wildcard watch
// wildcard indicates if the subscription currently has a wildcard watch.
wildcard bool

// subscribedResourceNames provides the resources explicitly requested by the client
// This list might be non-empty even when set as wildcard
// This list might be non-empty even when set as wildcard.
subscribedResourceNames map[string]struct{}

// resourceVersions contains the resources acknowledged by the client and the versions
// associated to them
resourceVersions map[string]string
// ackedResources contains the resources acknowledged by the client and the acknowledged versions.
ackedResources map[string]string
}

// NewSubscriptionState initializes a stream state.
func NewSubscriptionState(wildcard bool, initialResourceVersions map[string]string) SubscriptionState {
state := SubscriptionState{
wildcard: wildcard,
subscribedResourceNames: map[string]struct{}{},
resourceVersions: initialResourceVersions,
ackedResources: initialResourceVersions,
}

if initialResourceVersions == nil {
state.resourceVersions = make(map[string]string)
state.ackedResources = make(map[string]string)
}

return state
@@ -43,16 +42,16 @@ func (s *SubscriptionState) SetSubscribedResources(subscribedResourceNames map[s
s.subscribedResourceNames = subscribedResourceNames
}

// GetKnownResources returns the list of resources acknowledged by the client
// GetACKedResources returns the list of resources acknowledged by the client
// and their acknowledged version
func (s SubscriptionState) GetKnownResources() map[string]string {
return s.resourceVersions
func (s SubscriptionState) GetACKedResources() map[string]string {
return s.ackedResources
}

// SetKnownResources sets a list of resource versions currently known by the client
// SetACKedResources sets a list of resource versions currently known by the client
// The cache can use this state to compute resources added/updated/deleted
func (s *SubscriptionState) SetKnownResources(resourceVersions map[string]string) {
s.resourceVersions = resourceVersions
func (s *SubscriptionState) SetACKedResources(resourceVersions map[string]string) {
s.ackedResources = resourceVersions
}

// SetWildcard will set the subscription to return all known resources
8 changes: 4 additions & 4 deletions pkg/server/v3/delta_test.go
Original file line number Diff line number Diff line change
@@ -37,7 +37,7 @@ func (config *mockConfigWatcher) CreateDeltaWatch(req *discovery.DeltaDiscoveryR
// If we are handling a wildcard request, we want to respond with all resources
switch {
case state.IsWildcard():
if len(state.GetKnownResources()) == 0 {
if len(state.GetACKedResources()) == 0 {
filtered = make([]types.Resource, 0, len(resourceMap))
}
nextVersionMap = make(map[string]string, len(resourceMap))
@@ -46,14 +46,14 @@ func (config *mockConfigWatcher) CreateDeltaWatch(req *discovery.DeltaDiscoveryR
// we can just set it here to be used for comparison later
version := versionMap[name]
nextVersionMap[name] = version
prevVersion, found := state.GetKnownResources()[name]
prevVersion, found := state.GetACKedResources()[name]
if !found || (prevVersion != version) {
filtered = append(filtered, r)
}
}

// Compute resources for removal
for name := range state.GetKnownResources() {
for name := range state.GetACKedResources() {
if _, ok := resourceMap[name]; !ok {
toRemove = append(toRemove, name)
}
@@ -63,7 +63,7 @@ func (config *mockConfigWatcher) CreateDeltaWatch(req *discovery.DeltaDiscoveryR
// state.GetResourceVersions() may include resources no longer subscribed
// In the current code this gets silently cleaned when updating the version map
for name := range state.GetSubscribedResources() {
prevVersion, found := state.GetKnownResources()[name]
prevVersion, found := state.GetACKedResources()[name]
if r, ok := resourceMap[name]; ok {
nextVersion := versionMap[name]
if prevVersion != nextVersion {