diff --git a/lib/services/local/connection_diagnostic.go b/lib/services/local/connection_diagnostic.go index 645318152b4c3..5ae2c11eb649d 100644 --- a/lib/services/local/connection_diagnostic.go +++ b/lib/services/local/connection_diagnostic.go @@ -85,7 +85,6 @@ func (s *ConnectionDiagnosticService) UpdateConnectionDiagnostic(ctx context.Con } // AppendDiagnosticTrace adds a Trace into the ConnectionDiagnostics. -// It does a CompareAndSwap to ensure atomicity. func (s *ConnectionDiagnosticService) AppendDiagnosticTrace(ctx context.Context, name string, t *types.ConnectionDiagnosticTrace) (types.ConnectionDiagnostic, error) { existing, err := s.Get(ctx, backend.NewKey(connectionDiagnosticPrefix, name)) if err != nil { @@ -115,7 +114,7 @@ func (s *ConnectionDiagnosticService) AppendDiagnosticTrace(ctx context.Context, Revision: existing.Revision, } - _, err = s.CompareAndSwap(ctx, *existing, newItem) + _, err = s.ConditionalUpdate(ctx, newItem) if err != nil { return nil, trace.Wrap(err) } diff --git a/lib/services/local/dynamic_access.go b/lib/services/local/dynamic_access.go index 8f61c378bbff5..485c12f8c0c36 100644 --- a/lib/services/local/dynamic_access.go +++ b/lib/services/local/dynamic_access.go @@ -135,7 +135,7 @@ func (s *DynamicAccessService) SetAccessRequestState(ctx context.Context, params if err != nil { return nil, trace.Wrap(err) } - if _, err := s.CompareAndSwap(ctx, *item, newItem); err != nil { + if _, err := s.ConditionalUpdate(ctx, newItem); err != nil { if trace.IsCompareFailed(err) { select { case <-retry.After(): @@ -195,7 +195,7 @@ func (s *DynamicAccessService) ApplyAccessReview(ctx context.Context, params typ if err != nil { return nil, trace.Wrap(err) } - if _, err := s.CompareAndSwap(ctx, *item, newItem); err != nil { + if _, err := s.ConditionalUpdate(ctx, newItem); err != nil { if trace.IsCompareFailed(err) { select { case <-retry.After(): @@ -411,10 +411,8 @@ func (s *DynamicAccessService) CreateAccessRequestAllowedPromotions(ctx context. if err != nil { return trace.Wrap(err) } - // Currently, this logic is used only internally (no API exposed), and - // there is only one place that calls it. If this ever changes, we will - // need to do a CompareAndSwap here. - if _, err := s.Put(ctx, item); err != nil { + + if _, err := s.Create(ctx, item); err != nil { return trace.Wrap(err) } return nil diff --git a/lib/services/local/plugin_data.go b/lib/services/local/plugin_data.go index 34cc5934fc774..591b85e124aa1 100644 --- a/lib/services/local/plugin_data.go +++ b/lib/services/local/plugin_data.go @@ -198,7 +198,7 @@ func (p *PluginDataService) updatePluginData(ctx context.Context, params types.P return trace.Wrap(err) } } else { - if _, err := p.CompareAndSwap(ctx, *item, newItem); err != nil { + if _, err := p.ConditionalUpdate(ctx, newItem); err != nil { if trace.IsCompareFailed(err) { select { case <-retry.After(): diff --git a/lib/services/local/plugins.go b/lib/services/local/plugins.go index 4f2bcfe7a1fa2..10a502e5382a7 100644 --- a/lib/services/local/plugins.go +++ b/lib/services/local/plugins.go @@ -246,7 +246,7 @@ func (s *PluginsService) updateAndSwap(ctx context.Context, name string, modify return trace.Wrap(err) } - _, err = s.backend.CompareAndSwap(ctx, *item, backend.Item{ + _, err = s.backend.ConditionalUpdate(ctx, backend.Item{ Key: backend.NewKey(pluginsPrefix, plugin.GetName()), Value: value, Expires: plugin.Expiry(), diff --git a/lib/services/local/presence.go b/lib/services/local/presence.go index e149abd09b362..0e2edfc9304a4 100644 --- a/lib/services/local/presence.go +++ b/lib/services/local/presence.go @@ -593,7 +593,7 @@ func (s *PresenceService) acquireSemaphore(ctx context.Context, key backend.Key, if err != nil { return nil, trace.Wrap(err) } - sem, err := services.UnmarshalSemaphore(item.Value) + sem, err := services.UnmarshalSemaphore(item.Value, services.WithRevision(item.Revision)) if err != nil { return nil, trace.Wrap(err) } @@ -618,7 +618,7 @@ func (s *PresenceService) acquireSemaphore(ctx context.Context, key backend.Key, Revision: rev, } - if _, err := s.CompareAndSwap(ctx, *item, newItem); err != nil { + if _, err := s.ConditionalUpdate(ctx, newItem); err != nil { return nil, trace.Wrap(err) } return lease, nil @@ -644,7 +644,7 @@ func (s *PresenceService) KeepAliveSemaphoreLease(ctx context.Context, lease typ return trace.Wrap(err) } - sem, err := services.UnmarshalSemaphore(item.Value) + sem, err := services.UnmarshalSemaphore(item.Value, services.WithRevision(item.Revision)) if err != nil { return trace.Wrap(err) } @@ -668,7 +668,7 @@ func (s *PresenceService) KeepAliveSemaphoreLease(ctx context.Context, lease typ Revision: rev, } - _, err = s.CompareAndSwap(ctx, *item, newItem) + _, err = s.ConditionalUpdate(ctx, newItem) if err != nil { if trace.IsCompareFailed(err) { return trace.CompareFailed("semaphore %v/%v has been concurrently updated, try again", sem.GetSubKind(), sem.GetName()) @@ -708,7 +708,7 @@ func (s *PresenceService) CancelSemaphoreLease(ctx context.Context, lease types. return trace.Wrap(err) } - sem, err := services.UnmarshalSemaphore(item.Value) + sem, err := services.UnmarshalSemaphore(item.Value, services.WithRevision(item.Revision)) if err != nil { return trace.Wrap(err) } @@ -730,7 +730,7 @@ func (s *PresenceService) CancelSemaphoreLease(ctx context.Context, lease types. Revision: rev, } - _, err = s.CompareAndSwap(ctx, *item, newItem) + _, err = s.ConditionalUpdate(ctx, newItem) switch { case err == nil: return nil diff --git a/lib/services/local/sessiontracker.go b/lib/services/local/sessiontracker.go index b8906b02d04ad..ad6fc5e9d06f1 100644 --- a/lib/services/local/sessiontracker.go +++ b/lib/services/local/sessiontracker.go @@ -32,11 +32,11 @@ import ( ) const ( - sessionPrefix = "session_tracker" - retryDelay = time.Second - terminatedTTL = 3 * time.Minute - casRetryLimit = 7 - casErrorMessage = "CompareAndSwap reached retry limit" + sessionPrefix = "session_tracker" + retryDelay = time.Second + terminatedTTL = 3 * time.Minute + updateRetryLimit = 7 + updateRetryLimitMessage = "Update retry limit reached" ) type sessionTracker struct { @@ -63,7 +63,7 @@ func (s *sessionTracker) loadSession(ctx context.Context, sessionID string) (typ // UpdatePresence updates the presence status of a user in a session. func (s *sessionTracker) UpdatePresence(ctx context.Context, sessionID, user string) error { - for i := 0; i < casRetryLimit; i++ { + for i := 0; i < updateRetryLimit; i++ { sessionItem, err := s.bk.Get(ctx, backend.NewKey(sessionPrefix, sessionID)) if err != nil { return trace.Wrap(err) @@ -89,7 +89,7 @@ func (s *sessionTracker) UpdatePresence(ctx context.Context, sessionID, user str Expires: session.Expiry(), Revision: sessionItem.Revision, } - _, err = s.bk.CompareAndSwap(ctx, *sessionItem, item) + _, err = s.bk.ConditionalUpdate(ctx, item) if trace.IsCompareFailed(err) { select { case <-ctx.Done(): @@ -102,7 +102,7 @@ func (s *sessionTracker) UpdatePresence(ctx context.Context, sessionID, user str return trace.Wrap(err) } - return trace.CompareFailed(casErrorMessage) + return trace.CompareFailed(updateRetryLimitMessage) } // GetSessionTracker returns the current state of a session tracker for an active session. @@ -202,7 +202,7 @@ func (s *sessionTracker) CreateSessionTracker(ctx context.Context, tracker types // UpdateSessionTracker updates a tracker resource for an active session. func (s *sessionTracker) UpdateSessionTracker(ctx context.Context, req *proto.UpdateSessionTrackerRequest) error { - for i := 0; i < casRetryLimit; i++ { + for i := 0; i < updateRetryLimit; i++ { sessionItem, err := s.bk.Get(ctx, backend.NewKey(sessionPrefix, req.SessionID)) if err != nil { return trace.Wrap(err) @@ -268,7 +268,7 @@ func (s *sessionTracker) UpdateSessionTracker(ctx context.Context, req *proto.Up Expires: expiry, Revision: sessionItem.Revision, } - _, err = s.bk.CompareAndSwap(ctx, *sessionItem, item) + _, err = s.bk.ConditionalUpdate(ctx, item) if trace.IsCompareFailed(err) { select { case <-ctx.Done(): @@ -281,7 +281,7 @@ func (s *sessionTracker) UpdateSessionTracker(ctx context.Context, req *proto.Up return trace.Wrap(err) } - return trace.CompareFailed(casErrorMessage) + return trace.CompareFailed(updateRetryLimitMessage) } // RemoveSessionTracker removes a tracker resource for an active session. diff --git a/lib/services/local/unstable.go b/lib/services/local/unstable.go index de3731fff13d7..15643e4e73b47 100644 --- a/lib/services/local/unstable.go +++ b/lib/services/local/unstable.go @@ -76,7 +76,8 @@ func (s UnstableService) AssertSystemRole(ctx context.Context, req proto.SystemR Expires: time.Now().Add(assertionTTL).UTC(), } if item != nil { - _, err = s.CompareAndSwap(ctx, *item, newItem) + newItem.Revision = item.Revision + _, err = s.ConditionalUpdate(ctx, newItem) if trace.IsCompareFailed(err) { // nodes are expected to perform assertions sequentially return trace.CompareFailed("system role assertion set was concurrently modified (this is bug)")