Skip to content

Commit

Permalink
[CR] Only reset if owned when cancelling weak ownership
Browse files Browse the repository at this point in the history
If ownership has already been lost to another thread, do NOT delete
ownership when released.

- KV does not provide a DeleteIf operation.  Instead, use SetIf with an
  always-expired timestamp.
- Along the way, ensure "owner" string is truly unique by stringing a nanoid
  onto it.  Currently owner is the request ID, which should be unique - but
  adding randomness ensures it will always be unique regardless of the
  calling system.
  • Loading branch information
arielshaqed committed Oct 14, 2024
1 parent c7ecd5f commit 0ec602d
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 5 deletions.
45 changes: 40 additions & 5 deletions pkg/kv/util/weak_owner.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"math/rand"
"time"

nanoid "github.com/matoous/go-nanoid/v2"
"github.com/treeverse/lakefs/pkg/kv"
"github.com/treeverse/lakefs/pkg/logging"
timestamppb "google.golang.org/protobuf/types/known/timestamppb"
Expand Down Expand Up @@ -244,10 +245,44 @@ func (w *WeakOwner) startOwningKey(ctx context.Context, owner string, prefixedKe
}
}

// Own blocks until it gets weak ownership of key for owner. Ownership
// will be refreshed at resolution interval. It returns a function to stop
// owning key.
// releaseIf releases prefixedKey if it has the owner.
func (w *WeakOwner) releaseIf(ctx context.Context, owner string, prefixedKey []byte) error {
log := w.Log.WithContext(ctx).WithFields(logging.Fields{
"prefixed_key": string(prefixedKey),
"owner": owner,
})
ownership := WeakOwnership{}
predicate, err := kv.GetMsg(ctx, w.Store, weakOwnerPartition, prefixedKey, &ownership)
if err != nil {
return fmt.Errorf("get ownership message %s to release it from %s: %w", string(prefixedKey), owner, err)
}
log = log.WithField("new_owner", ownership.Owner)

if ownership.Owner != owner {
log.Info("Lost ownership race before trying to release")
return nil
}
ownership.Owner = fmt.Sprintf("[released] %s", ownership.Owner)
// Set expiration to the beginning of time - definitely expired.
ownership.Expires.Reset()
err = kv.SetMsgIf(ctx, w.Store, weakOwnerPartition, prefixedKey, &ownership, predicate)
if errors.Is(err, kv.ErrPredicateFailed) {
log.WithFields(logging.Fields{
"prefixed_key": string(prefixedKey),
"owner": owner,
"new_owner": ownership.Owner,
}).Info("Lost ownership race while trying to release")
// Succeeded: we did not
return nil
}
return err
}

// Own blocks until it gets weak ownership of key for owner. Ownership will be refreshed at
// resolution interval. It returns a function to stop owning key. Own appends its random slug to
// owner, to identify the owner uniquely.
func (w *WeakOwner) Own(ctx context.Context, owner, key string) (func(), error) {
owner = fmt.Sprintf("%s#%s", owner, nanoid.Must())
prefixedKey := []byte(fmt.Sprintf("%s/%s", w.Prefix, key))
err := w.startOwningKey(ctx, owner, prefixedKey)
if err != nil {
Expand All @@ -257,8 +292,8 @@ func (w *WeakOwner) Own(ctx context.Context, owner, key string) (func(), error)
go w.refreshKey(refreshCtx, owner, prefixedKey)
stopOwning := func() {
defer refreshCancel()
// Use the original context - in case cancelled twice.
err := w.Store.Delete(ctx, []byte(weakOwnerPartition), prefixedKey)
// This func might be called twice, so use the original ctx not refreshCtx.
err := w.releaseIf(ctx, owner, prefixedKey)
if err != nil {
w.Log.
WithContext(ctx).
Expand Down
2 changes: 2 additions & 0 deletions pkg/kv/util/weak_owner.proto
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ package io.treeverse.lakefs.kv;

// message data model for weak ownership
message WeakOwnership {
// owner is a unique identifier for this particular instantiation. Different concurrent owners
// must have different owner strings. Easiest to set it to something random.
string owner = 1;
google.protobuf.Timestamp expires = 2;
string comment = 3;
Expand Down

0 comments on commit 0ec602d

Please sign in to comment.