diff --git a/pkg/kv/util/weak_owner.go b/pkg/kv/util/weak_owner.go index a4f310e5b54..ed6357c1924 100644 --- a/pkg/kv/util/weak_owner.go +++ b/pkg/kv/util/weak_owner.go @@ -7,6 +7,7 @@ import ( "math/rand" "time" + nanoid "github.com/matoous/go-nanoid/v2" "github.com/treeverse/lakefs/pkg/kv" "github.com/treeverse/lakefs/pkg/logging" timestamppb "google.golang.org/protobuf/types/known/timestamppb" @@ -244,10 +245,44 @@ func (w *WeakOwner) startOwningKey(ctx context.Context, owner string, prefixedKe } } -// Own blocks until it gets weak ownership of key for owner. Ownership -// will be refreshed at resolution interval. It returns a function to stop -// owning key. +// releaseIf releases prefixedKey if it has the owner. +func (w *WeakOwner) releaseIf(ctx context.Context, owner string, prefixedKey []byte) error { + log := w.Log.WithContext(ctx).WithFields(logging.Fields{ + "prefixed_key": string(prefixedKey), + "owner": owner, + }) + ownership := WeakOwnership{} + predicate, err := kv.GetMsg(ctx, w.Store, weakOwnerPartition, prefixedKey, &ownership) + if err != nil { + return fmt.Errorf("get ownership message %s to release it from %s: %w", string(prefixedKey), owner, err) + } + log = log.WithField("new_owner", ownership.Owner) + + if ownership.Owner != owner { + log.Info("Lost ownership race before trying to release") + return nil + } + ownership.Owner = fmt.Sprintf("[released] %s", ownership.Owner) + // Set expiration to the beginning of time - definitely expired. + ownership.Expires.Reset() + err = kv.SetMsgIf(ctx, w.Store, weakOwnerPartition, prefixedKey, &ownership, predicate) + if errors.Is(err, kv.ErrPredicateFailed) { + log.WithFields(logging.Fields{ + "prefixed_key": string(prefixedKey), + "owner": owner, + "new_owner": ownership.Owner, + }).Info("Lost ownership race while trying to release") + // Succeeded: we did not + return nil + } + return err +} + +// Own blocks until it gets weak ownership of key for owner. Ownership will be refreshed at +// resolution interval. It returns a function to stop owning key. Own appends its random slug to +// owner, to identify the owner uniquely. func (w *WeakOwner) Own(ctx context.Context, owner, key string) (func(), error) { + owner = fmt.Sprintf("%s#%s", owner, nanoid.Must()) prefixedKey := []byte(fmt.Sprintf("%s/%s", w.Prefix, key)) err := w.startOwningKey(ctx, owner, prefixedKey) if err != nil { @@ -257,8 +292,8 @@ func (w *WeakOwner) Own(ctx context.Context, owner, key string) (func(), error) go w.refreshKey(refreshCtx, owner, prefixedKey) stopOwning := func() { defer refreshCancel() - // Use the original context - in case cancelled twice. - err := w.Store.Delete(ctx, []byte(weakOwnerPartition), prefixedKey) + // This func might be called twice, so use the original ctx not refreshCtx. + err := w.releaseIf(ctx, owner, prefixedKey) if err != nil { w.Log. WithContext(ctx). diff --git a/pkg/kv/util/weak_owner.proto b/pkg/kv/util/weak_owner.proto index 81d0b5697fd..069e0758293 100644 --- a/pkg/kv/util/weak_owner.proto +++ b/pkg/kv/util/weak_owner.proto @@ -7,6 +7,8 @@ package io.treeverse.lakefs.kv; // message data model for weak ownership message WeakOwnership { + // owner is a unique identifier for this particular instantiation. Different concurrent owners + // must have different owner strings. Easiest to set it to something random. string owner = 1; google.protobuf.Timestamp expires = 2; string comment = 3;