Skip to content

Commit

Permalink
[Linear cache] Support use of stable versions in sotw (#12)
Browse files Browse the repository at this point in the history
[Linear cache] Support use of stable versions in sotw

When using sotw watches, the current behavior of the linear cache is to use the current version of the cache (monotonically increased on resource updates) as the version returned. In the past the request-provided version was compared to the version of the last resource update to compute the returned resources, allowing watch resumption when a client would reconnect.
This behavior was actually not working as the subscribed resources could change while no cache updates occurred, or the newly requested resources were at an older cache version. PR #10 therefore no longer use this behavior to track resources to be returned, instead relying on the subscription state. A side effect is that watch resumption would always return all known resources.
Delta watches have a mechanism to avoid this issue, by tracking per resource version and sending them as part of the initial request of a new subscription. 

Sotw do not allow per resource version in requests and responses, but by encoding the current subscription state through a hash of the returned versions map, this PR now allows resumption if the hash matches the response we would otherwise return. It still has two main limitations: it is less efficient (as we compute an entire response to then not reply) and we cannot track which resource (if any) changed, and will therefore return them all if anything has changed.

Signed-off-by: Valerian Roche <[email protected]>
  • Loading branch information
valerian-roche authored and zhiyanfoo committed Apr 10, 2024
1 parent 8227356 commit 5c3a9e6
Show file tree
Hide file tree
Showing 5 changed files with 331 additions and 76 deletions.
204 changes: 132 additions & 72 deletions pkg/cache/v3/linear.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,11 @@ package cache

import (
"context"
"encoding/hex"
"errors"
"fmt"
"hash/fnv"
"sort"
"strconv"
"strings"
"sync"
Expand All @@ -38,6 +41,13 @@ type cachedResource struct {
stableVersion string
}

func newCachedResource(res types.Resource, cacheVersion string) *cachedResource {
return &cachedResource{
Resource: res,
cacheVersion: cacheVersion,
}
}

func (c *cachedResource) getStableVersion() (string, error) {
if c.stableVersion != "" {
return c.stableVersion, nil
Expand Down Expand Up @@ -108,6 +118,12 @@ type LinearCache struct {
// cache instances and avoid issues of version reuse.
versionPrefix string

// useStableVersionsInSotw switches to a new version model for sotw watches.
// When activated, versions are stored in subscriptions using stable versions, and the response version
// is an hash of the returned versions to allow watch resumptions when reconnecting to the cache with a
// new subscription.
useStableVersionsInSotw bool

log log.Logger

mu sync.RWMutex
Expand All @@ -121,6 +137,8 @@ type LinearCacheOption func(*LinearCache)
// WithVersionPrefix sets a version prefix of the form "prefixN" in the version info.
// Version prefix can be used to distinguish replicated instances of the cache, in case
// a client re-connects to another instance.
// Deprecated: use WithSotwStableVersions instead to avoid issues when reconnecting to other instances
// while avoiding resending resources if unchanged.
func WithVersionPrefix(prefix string) LinearCacheOption {
return func(cache *LinearCache) {
cache.versionPrefix = prefix
Expand All @@ -144,6 +162,18 @@ func WithLogger(log log.Logger) LinearCacheOption {
}
}

// WithSotwStableVersions changes the versions returned in sotw to encode the list of resources known
// in the subscription.
// The use of stable versions for sotw also deduplicates updates to clients if the cache updates are
// not changing the content of the resource.
// When used, the use of WithVersionPrefix is no longer needed to manage reconnection to other instances
// and should not be used.
func WithSotwStableVersions() LinearCacheOption {
return func(cache *LinearCache) {
cache.useStableVersionsInSotw = true
}
}

// NewLinearCache creates a new cache. See the comments on the struct definition.
func NewLinearCache(typeURL string, opts ...LinearCacheOption) *LinearCache {
out := &LinearCache{
Expand Down Expand Up @@ -250,80 +280,114 @@ func (cache *LinearCache) computeResourceChange(sub Subscription, alwaysConsider
return changedResources, removedResources, nil
}

func computeSotwStableVersion(versionMap map[string]string) string {
// To enforce a stable hash we need to have an ordered vision of the map.
keys := make([]string, 0, len(versionMap))
for key := range versionMap {
keys = append(keys, key)
}
sort.Strings(keys)

mapHasher := fnv.New64()

buffer := make([]byte, 0, 8)
itemHasher := fnv.New64()
for _, key := range keys {
buffer = buffer[:0]
itemHasher.Reset()
itemHasher.Write([]byte(key))
mapHasher.Write(itemHasher.Sum(buffer))
buffer = buffer[:0]
itemHasher.Reset()
itemHasher.Write([]byte(versionMap[key]))
mapHasher.Write(itemHasher.Sum(buffer))
}
buffer = buffer[:0]
return hex.EncodeToString(mapHasher.Sum(buffer))
}

func (cache *LinearCache) computeSotwResponse(watch ResponseWatch, alwaysConsiderAllResources bool) (*RawResponse, error) {
changedResources, removedResources, err := cache.computeResourceChange(watch.subscription, alwaysConsiderAllResources, false)
changedResources, removedResources, err := cache.computeResourceChange(watch.subscription, alwaysConsiderAllResources, cache.useStableVersionsInSotw)
if err != nil {
return nil, err
}

if len(changedResources) == 0 && len(removedResources) == 0 && !alwaysConsiderAllResources {
// Nothing changed.
return nil, nil
}

returnedVersions := make(map[string]string, len(watch.subscription.ReturnedResources()))
// Clone the current returned versions. The cache should not alter the subscription
for resourceName, version := range watch.subscription.ReturnedResources() {
returnedVersions[resourceName] = version
}

cacheVersion := cache.getVersion()
var resources []types.ResourceWithTTL
// In sotw the list of resources to actually return depends on:
// - whether the type requires full-state in each reply (lds and cds).
// - whether the request is wildcard.
// resourcesToReturn will include all the resource names to reply based on the changes detected.
var resourcesToReturn []string

switch {
// For lds and cds, answers will always include all matching resources, with no regard to which resource was changed or removed.
// For lds and cds, answers will always include all existing subscribed resources, with no regard to which resource was changed or removed.
// For other types, the response only includes updated resources (sotw cannot notify for deletion).
case !ResourceRequiresFullStateInSotw(cache.typeURL):
// changedResources is already filtered based on the subscription.
resources = make([]types.ResourceWithTTL, 0, len(changedResources))
for _, resourceName := range changedResources {
cachedResource := cache.resources[resourceName]
resources = append(resources, types.ResourceWithTTL{Resource: cachedResource.Resource})
returnedVersions[resourceName] = cachedResource.cacheVersion
if !alwaysConsiderAllResources && len(changedResources) == 0 {
// If the request is not the initial one, and the type does not require full updates,
// do not return if nothing is to be set.
// For full-state resources an empty response does have a semantic meaning.
return nil, nil
}

// changedResources is already filtered based on the subscription.
resourcesToReturn = changedResources
case watch.subscription.IsWildcard():
// Include all resources for the type.
resources = make([]types.ResourceWithTTL, 0, len(cache.resources))
for resourceName, cachedResource := range cache.resources {
resources = append(resources, types.ResourceWithTTL{Resource: cachedResource.Resource})
returnedVersions[resourceName] = cachedResource.cacheVersion
resourcesToReturn = make([]string, 0, len(cache.resources))
for resourceName := range cache.resources {
resourcesToReturn = append(resourcesToReturn, resourceName)
}
default:
// Include all resources matching the subscription, with no concern on whether
// it has been updated or not.
// Include all resources matching the subscription, with no concern on whether it has been updated or not.
requestedResources := watch.subscription.SubscribedResources()
// The linear cache could be very large (e.g. containing all potential CLAs)
// Therefore drives on the subscription requested resources.
resources = make([]types.ResourceWithTTL, 0, len(requestedResources))
resourcesToReturn = make([]string, 0, len(requestedResources))
for resourceName := range requestedResources {
cachedResource, ok := cache.resources[resourceName]
if !ok {
continue
if _, ok := cache.resources[resourceName]; ok {
resourcesToReturn = append(resourcesToReturn, resourceName)
}
resources = append(resources, types.ResourceWithTTL{Resource: cachedResource.Resource})
returnedVersions[resourceName] = cachedResource.cacheVersion
}
}

// returnedVersions includes all resources currently known to the subscription and their version.
returnedVersions := make(map[string]string, len(watch.subscription.ReturnedResources()))
// Clone the current returned versions. The cache should not alter the subscription.
for resourceName, version := range watch.subscription.ReturnedResources() {
returnedVersions[resourceName] = version
}

resources := make([]types.ResourceWithTTL, 0, len(resourcesToReturn))
for _, resourceName := range resourcesToReturn {
cachedResource := cache.resources[resourceName]
resources = append(resources, types.ResourceWithTTL{Resource: cachedResource.Resource})
version, err := cachedResource.getVersion(cache.useStableVersionsInSotw)
if err != nil {
return nil, fmt.Errorf("failed to compute version of %s: %w", resourceName, err)
}
returnedVersions[resourceName] = version
}
// Cleanup resources no longer existing in the cache or no longer subscribed.
// In sotw we cannot return those if not full state,
// but this ensures we detect unsubscription then resubscription.
for _, resourceName := range removedResources {
delete(returnedVersions, resourceName)
}

if !alwaysConsiderAllResources && !ResourceRequiresFullStateInSotw(cache.typeURL) && len(resources) == 0 {
// If the request is not the initial one, and the type does not require full updates,
// do not return if nothing is to be set.
// For full-state resources an empty response does have a semantic meaning.
return nil, nil
responseVersion := cache.getVersion()
if cache.useStableVersionsInSotw {
responseVersion = cache.versionPrefix + computeSotwStableVersion(returnedVersions)
}

return &RawResponse{
Request: watch.Request,
Resources: resources,
ReturnedResources: returnedVersions,
Version: cacheVersion,
Version: responseVersion,
Ctx: context.Background(),
}, nil
}
Expand Down Expand Up @@ -395,7 +459,7 @@ func (cache *LinearCache) notifyAll(modified []string) error {
watch.Response <- response
cache.removeWatch(watchID, watch.subscription)
} else {
cache.log.Warnf("[Linear cache] Watch %d detected as triggered but no change was found", watchID)
cache.log.Infof("[Linear cache] Watch %d detected as triggered but no change was found", watchID)
}
}

Expand All @@ -409,7 +473,7 @@ func (cache *LinearCache) notifyAll(modified []string) error {
watch.Response <- response
delete(cache.wildcardWatches.sotw, watchID)
} else {
cache.log.Warnf("[Linear cache] Wildcard watch %d detected as triggered but no change was found", watchID)
cache.log.Infof("[Linear cache] Wildcard watch %d detected as triggered but no change was found", watchID)
}
}

Expand All @@ -424,7 +488,7 @@ func (cache *LinearCache) notifyAll(modified []string) error {
watch.Response <- response
cache.removeDeltaWatch(watchID, watch.subscription)
} else {
cache.log.Warnf("[Linear cache] Delta watch %d detected as triggered but no change was found", watchID)
cache.log.Infof("[Linear cache] Delta watch %d detected as triggered but no change was found", watchID)
}
}

Expand All @@ -438,32 +502,13 @@ func (cache *LinearCache) notifyAll(modified []string) error {
watch.Response <- response
delete(cache.wildcardWatches.delta, watchID)
} else {
cache.log.Warnf("[Linear cache] Wildcard delta watch %d detected as triggered but no change was found", watchID)
cache.log.Infof("[Linear cache] Wildcard delta watch %d detected as triggered but no change was found", watchID)
}
}

return nil
}

func computeResourceStableVersion(res types.Resource) (string, error) {
// TODO(valerian-roche): store serialized resource as part of the cachedResource
// to reuse it when marshaling the responses instead of remarshaling and recomputing the version then.
marshaledResource, err := MarshalResource(res)
if err != nil {
return "", err
}
return HashResource(marshaledResource), nil
}

func (cache *LinearCache) addResourceToCache(name string, res types.Resource) error {
update := &cachedResource{
Resource: res,
cacheVersion: cache.getVersion(),
}
cache.resources[name] = update
return nil
}

// UpdateResource updates a resource in the collection.
func (cache *LinearCache) UpdateResource(name string, res types.Resource) error {
if res == nil {
Expand All @@ -473,9 +518,7 @@ func (cache *LinearCache) UpdateResource(name string, res types.Resource) error
defer cache.mu.Unlock()

cache.version++
if err := cache.addResourceToCache(name, res); err != nil {
return err
}
cache.resources[name] = newCachedResource(res, cache.getVersion())

return cache.notifyAll([]string{name})
}
Expand All @@ -499,11 +542,10 @@ func (cache *LinearCache) UpdateResources(toUpdate map[string]types.Resource, to
defer cache.mu.Unlock()

cache.version++
version := cache.getVersion()
modified := make([]string, 0, len(toUpdate)+len(toDelete))
for name, resource := range toUpdate {
if err := cache.addResourceToCache(name, resource); err != nil {
return err
}
cache.resources[name] = newCachedResource(resource, version)
modified = append(modified, name)
}
for _, name := range toDelete {
Expand All @@ -521,6 +563,7 @@ func (cache *LinearCache) SetResources(resources map[string]types.Resource) {
defer cache.mu.Unlock()

cache.version++
version := cache.getVersion()

modified := make([]string, 0, len(resources))
// Collect deleted resource names.
Expand All @@ -531,13 +574,11 @@ func (cache *LinearCache) SetResources(resources map[string]types.Resource) {
}
}

// Collect changed resource names.
// We assume all resources passed to SetResources are changed.
// Otherwise we would have to do proto.Equal on resources which is pretty expensive operation
// In delta and if stable versions are used for sotw, identical resources will not trigger watches.
// In sotw without stable versions used, all those resources will trigger watches, even if identical.
for name, resource := range resources {
if err := cache.addResourceToCache(name, resource); err != nil {
cache.log.Errorf("Failed to add resources to the cache: %s", err)
}
cache.resources[name] = newCachedResource(resource, version)
modified = append(modified, name)
}

Expand Down Expand Up @@ -598,8 +639,27 @@ func (cache *LinearCache) CreateWatch(request *Request, sub Subscription, value
if err != nil {
return nil, fmt.Errorf("failed to compute the watch respnse: %w", err)
}

shouldReply := false
if response != nil {
// If the request
// - is the first
// - provides a non-empty version, matching the version prefix
// and the cache uses stable versions, if the generated versions are the same as the previous one, we do not return the response.
// This avoids resending all data if the new subscription is just a resumption of the previous one.
if cache.useStableVersionsInSotw && request.GetResponseNonce() == "" && !ignoreCurrentSubscriptionResources {
shouldReply = request.GetVersionInfo() != response.Version

// We confirmed the content of the known resources, store them in the watch we create.
subscription := newWatchSubscription(sub)
subscription.returnedResources = response.ReturnedResources
watch.subscription = subscription
sub = subscription
} else {
shouldReply = true
}
}

if shouldReply {
cache.log.Debugf("[linear cache] replying to the watch with resources %v (subscription values %v, known %v)", response.GetReturnedResources(), sub.SubscribedResources(), sub.ReturnedResources())
watch.Response <- response
return func() {}, nil
Expand All @@ -608,7 +668,7 @@ func (cache *LinearCache) CreateWatch(request *Request, sub Subscription, value
watchID := cache.nextWatchID()
// Create open watches since versions are up to date.
if sub.IsWildcard() {
cache.log.Infof("[linear cache] open watch %d for %s all resources, system version %q", watchID, cache.typeURL, cache.getVersion())
cache.log.Infof("[linear cache] open watch %d for %s all resources, known versions %v, system version %q", watchID, cache.typeURL, sub.ReturnedResources(), cache.getVersion())
cache.wildcardWatches.sotw[watchID] = watch
return func() {
cache.mu.Lock()
Expand All @@ -617,7 +677,7 @@ func (cache *LinearCache) CreateWatch(request *Request, sub Subscription, value
}, nil
}

cache.log.Infof("[linear cache] open watch %d for %s resources %v, system version %q", watchID, cache.typeURL, sub.SubscribedResources(), cache.getVersion())
cache.log.Infof("[linear cache] open watch %d for %s resources %v, known versions %v, system version %q", watchID, cache.typeURL, sub.SubscribedResources(), sub.ReturnedResources(), cache.getVersion())
for name := range sub.SubscribedResources() {
watches, exists := cache.resourceWatches[name]
if !exists {
Expand Down
Loading

0 comments on commit 5c3a9e6

Please sign in to comment.