Skip to content

Commit

Permalink
kv: extend CPut with FailOnTombstones option
Browse files Browse the repository at this point in the history
This commit extends the CPut command with FailOnTombstones option that
we already have in the InitPut - it makes it so that the CPut fails with
a ConditionFailedError if it encounters a tombstone. This begins the
process of folding all usages of InitPuts into CPuts. (We actually have
only one place where we use InitPuts with this flag set - in the sql
liveness system which allows us to ensure that after a session is dead,
it cannot be resurrected, which allows us to keep the dead session cache
easily.)

InitPut request was introduced long time ago in
1a95630 when we considered extending
CPut with a boolean for special tombstone handling too burdensome and
ugly. Since then we have extended the CPut with a few options, so
adding an extra one doesn't seem like a big deal anymore. On the plus
side, it removes the confusion that I personally had for why we used
InitPut as opposed to CPut with nil expected value in a few places (all
were removed in the previous commit).

The deprecation was alluded to during the review of #35157.

In terms of deprecating the InitPut altogether here is the expected
timeline:
- sometime after compatibility with 24.3 is no longer needed (so 25.3+),
we will remove the last spot were we still issue InitPut in the sql
liveness system
- sometime after compatibilty with the version mentioned above (say
26.1+), we'll be able to remove all InitPut-related code.

Release note: None
  • Loading branch information
yuzefovich committed Dec 23, 2024
1 parent 45bd97c commit ef02b98
Show file tree
Hide file tree
Showing 16 changed files with 208 additions and 36 deletions.
22 changes: 15 additions & 7 deletions pkg/kv/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -544,14 +544,20 @@ func (b *Batch) PutInline(key, value interface{}) {
// expValue needs to correspond to a Value.TagAndDataBytes() - i.e. a key's
// value without the checksum (as the checksum includes the key too).
func (b *Batch) CPut(key, value interface{}, expValue []byte) {
b.cputInternal(key, value, expValue, false, false)
b.cputInternal(key, value, expValue, false, false, false)
}

// CPutAllowingIfNotExists is like CPut except it also allows the Put when the
// existing entry does not exist -- i.e. it succeeds if there is no existing
// entry or the existing entry has the expected value.
func (b *Batch) CPutAllowingIfNotExists(key, value interface{}, expValue []byte) {
b.cputInternal(key, value, expValue, true, false)
b.cputInternal(key, value, expValue, true, false, false)
}

// CPutFailOnTombstones is like CPut except it fails with a
// ConditionedFailedError if it encounters a tombstone.
func (b *Batch) CPutFailOnTombstones(key, value interface{}, expValue []byte) {
b.cputInternal(key, value, expValue, false, false, true)
}

// CPutWithOriginTimestamp is like CPut except that it also sets the
Expand All @@ -577,7 +583,7 @@ func (b *Batch) CPutWithOriginTimestamp(
b.initResult(0, 1, notRaw, err)
return
}
r := kvpb.NewConditionalPut(k, v, expValue, false)
r := kvpb.NewConditionalPut(k, v, expValue, false, false)
r.(*kvpb.ConditionalPutRequest).OriginTimestamp = ts
r.(*kvpb.ConditionalPutRequest).ShouldWinOriginTimestampTie = shouldWinTie
b.appendReqs(r)
Expand All @@ -600,11 +606,11 @@ func (b *Batch) CPutWithOriginTimestamp(
// A nil value can be used to delete the respective key, since there is no
// DelInline(). This is different from CPut().
func (b *Batch) CPutInline(key, value interface{}, expValue []byte) {
b.cputInternal(key, value, expValue, false, true)
b.cputInternal(key, value, expValue, false, true, false)
}

func (b *Batch) cputInternal(
key, value interface{}, expValue []byte, allowNotExist bool, inline bool,
key, value interface{}, expValue []byte, allowNotExist bool, inline bool, failOnTombstones bool,
) {
k, err := marshalKey(key)
if err != nil {
Expand All @@ -617,9 +623,9 @@ func (b *Batch) cputInternal(
return
}
if inline {
b.appendReqs(kvpb.NewConditionalPutInline(k, v, expValue, allowNotExist))
b.appendReqs(kvpb.NewConditionalPutInline(k, v, expValue, allowNotExist, failOnTombstones))
} else {
b.appendReqs(kvpb.NewConditionalPut(k, v, expValue, allowNotExist))
b.appendReqs(kvpb.NewConditionalPut(k, v, expValue, allowNotExist, failOnTombstones))
}
b.approxMutationReqBytes += len(k) + len(v.RawBytes)
b.initResult(1, 1, notRaw, nil)
Expand Down Expand Up @@ -711,6 +717,8 @@ func (b *Batch) CPutValuesEmpty(bs BulkSource[roachpb.Value]) {
// key can be either a byte slice or a string. value can be any key type, a
// protoutil.Message or any Go primitive type (bool, int, etc). It is illegal
// to set value to nil.
// TODO(yuzefovich): this can be removed after compatibility with 24.3 is no
// longer needed.
func (b *Batch) InitPut(key, value interface{}, failOnTombstones bool) {
k, err := marshalKey(key)
if err != nil {
Expand Down
12 changes: 10 additions & 2 deletions pkg/kv/kvpb/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -1656,7 +1656,7 @@ func NewPutInline(key roachpb.Key, value roachpb.Value) Request {
// them. The caller retains ownership of expVal; NewConditionalPut will copy it
// into the request.
func NewConditionalPut(
key roachpb.Key, value roachpb.Value, expValue []byte, allowNotExist bool,
key roachpb.Key, value roachpb.Value, expValue []byte, allowNotExist bool, failOnTombstones bool,
) Request {
value.InitChecksum(key)
return &ConditionalPutRequest{
Expand All @@ -1666,6 +1666,7 @@ func NewConditionalPut(
Value: value,
ExpBytes: expValue,
AllowIfDoesNotExist: allowNotExist,
FailOnTombstones: failOnTombstones,
}
}

Expand All @@ -1676,7 +1677,7 @@ func NewConditionalPut(
// them. The caller retains ownership of expVal; NewConditionalPut will copy it
// into the request.
func NewConditionalPutInline(
key roachpb.Key, value roachpb.Value, expValue []byte, allowNotExist bool,
key roachpb.Key, value roachpb.Value, expValue []byte, allowNotExist bool, failOnTombstones bool,
) Request {
value.InitChecksum(key)
return &ConditionalPutRequest{
Expand All @@ -1687,6 +1688,7 @@ func NewConditionalPutInline(
ExpBytes: expValue,
AllowIfDoesNotExist: allowNotExist,
Inline: true,
FailOnTombstones: failOnTombstones,
}
}

Expand Down Expand Up @@ -2555,10 +2557,16 @@ func (writeOptions *WriteOptions) GetOriginTimestamp() hlc.Timestamp {
}

func (r *ConditionalPutRequest) Validate() error {
if r.AllowIfDoesNotExist && r.FailOnTombstones {
return errors.AssertionFailedf("invalid ConditionalPutRequest: AllowIfDoesNotExist and FailOnTombstones are incompatible")
}
if !r.OriginTimestamp.IsEmpty() {
if r.AllowIfDoesNotExist {
return errors.AssertionFailedf("invalid ConditionalPutRequest: AllowIfDoesNotExist and non-empty OriginTimestamp are incompatible")
}
if r.FailOnTombstones {
return errors.AssertionFailedf("invalid ConditionalPutRequest: FailOnTombstones and non-empty OriginTimestamp are incompatible")
}
}
return nil
}
10 changes: 10 additions & 0 deletions pkg/kv/kvpb/api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,8 @@ message ConditionalPutRequest {
// exist with that value. Passing this indicates that it is also OK if the key
// does not exist. This is useful when a given value is expected but it is
// possible it has not yet been written.
//
// Cannot be combined with FailOnTombstones nor with OriginTimestamp.
bool allow_if_does_not_exist = 5;
// Specify as true to put the value without a corresponding
// timestamp. This option should be used with care as it precludes
Expand Down Expand Up @@ -371,6 +373,11 @@ message ConditionalPutRequest {
//
// This must only be used in conjunction with OriginTimestamp.
bool should_win_origin_timestamp_tie = 9;

// If true, tombstones cause ConditionFailedErrors.
//
// Cannot be combined with AllowIfDoesNotExit nor with OriginTimestamp.
bool failOnTombstones = 10;
}

// A ConditionalPutResponse is the return value from the
Expand All @@ -385,6 +392,9 @@ message ConditionalPutResponse {
// - If key exists, returns a ConditionFailedError if value != existing value
// If failOnTombstones is set to true, tombstone values count as mismatched
// values and will cause a ConditionFailedError.
// TODO(yuzefovich): this can be removed once the compatibility with the version
// in which we stopped issuing InitPuts (some time after 25.3) is no longer
// needed.
message InitPutRequest {
RequestHeader header = 1 [(gogoproto.nullable) = false, (gogoproto.embed) = true];
Value value = 2 [(gogoproto.nullable) = false];
Expand Down
2 changes: 2 additions & 0 deletions pkg/kv/kvserver/batcheval/cmd_conditional_put.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ func ConditionalPut(
return result.Result{}, errors.AssertionFailedf("OriginTimestamp cannot be passed via CPut arg and in request header")
}

failOnTombstones := args.FailOnTombstones && !cArgs.EvalCtx.EvalKnobs().DisableInitPutCPutFailOnTombstones
opts := storage.ConditionalPutWriteOptions{
MVCCWriteOptions: storage.MVCCWriteOptions{
Txn: h.Txn,
Expand All @@ -80,6 +81,7 @@ func ConditionalPut(
Category: fs.BatchEvalReadCategory,
},
AllowIfDoesNotExist: storage.CPutMissingBehavior(args.AllowIfDoesNotExist),
FailOnTombstones: storage.CPutTombstoneBehavior(failOnTombstones),
OriginTimestamp: args.OriginTimestamp,
ShouldWinOriginTimestampTie: args.ShouldWinOriginTimestampTie,
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/batcheval/cmd_init_put.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func InitPut(
args := cArgs.Args.(*kvpb.InitPutRequest)
h := cArgs.Header

failOnTombstones := args.FailOnTombstones && !cArgs.EvalCtx.EvalKnobs().DisableInitPutFailOnTombstones
failOnTombstones := args.FailOnTombstones && !cArgs.EvalCtx.EvalKnobs().DisableInitPutCPutFailOnTombstones
opts := storage.MVCCWriteOptions{
Txn: h.Txn,
LocalTimestamp: cArgs.Now,
Expand Down
9 changes: 5 additions & 4 deletions pkg/kv/kvserver/kvserverbase/knobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,11 @@ type BatchEvalTestingKnobs struct {
// explanation of why.
AllowGCWithNewThresholdAndKeys bool

// DisableInitPutFailOnTombstones disables FailOnTombstones for InitPut. This
// is useful together with e.g. StoreTestingKnobs.GlobalMVCCRangeTombstone,
// where we still want InitPut to succeed on top of the range tombstone.
DisableInitPutFailOnTombstones bool
// DisableInitPutCPutFailOnTombstones disables FailOnTombstones for InitPut
// and CPut. This is useful together with e.g.
// StoreTestingKnobs.GlobalMVCCRangeTombstone, where we still want InitPut
// and CPut to succeed on top of the range tombstone.
DisableInitPutCPutFailOnTombstones bool

// UseRangeTombstonesForPointDeletes will use point-sized MVCC range
// tombstones when deleting point keys, to increase test coverage. These
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1372,7 +1372,7 @@ func cPutArgs(key roachpb.Key, value, expValue []byte) kvpb.ConditionalPutReques
if expValue != nil {
expValue = roachpb.MakeValueFromBytes(expValue).TagAndDataBytes()
}
req := kvpb.NewConditionalPut(key, roachpb.MakeValueFromBytes(value), expValue, false /* allowNotExist */)
req := kvpb.NewConditionalPut(key, roachpb.MakeValueFromBytes(value), expValue, false /* allowNotExist */, false /* failOnTombstones */)
return *req.(*kvpb.ConditionalPutRequest)
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/server/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,7 @@ func (cfg *BaseConfig) InitTestingKnobs() {
}
storeKnobs := cfg.TestingKnobs.Store.(*kvserver.StoreTestingKnobs)
storeKnobs.GlobalMVCCRangeTombstone = true
storeKnobs.EvalKnobs.DisableInitPutFailOnTombstones = true
storeKnobs.EvalKnobs.DisableInitPutCPutFailOnTombstones = true
cfg.TestingKnobs.RangeFeed.(*rangefeed.TestingKnobs).IgnoreOnDeleteRangeError = true
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/logictest/logic.go
Original file line number Diff line number Diff line change
Expand Up @@ -1536,8 +1536,8 @@ func (t *logicTest) newCluster(
DisableConsistencyQueue: true,
GlobalMVCCRangeTombstone: globalMVCCRangeTombstone,
EvalKnobs: kvserverbase.BatchEvalTestingKnobs{
DisableInitPutFailOnTombstones: ignoreMVCCRangeTombstoneErrors,
UseRangeTombstonesForPointDeletes: shouldUseMVCCRangeTombstonesForPointDeletes,
DisableInitPutCPutFailOnTombstones: ignoreMVCCRangeTombstoneErrors,
UseRangeTombstonesForPointDeletes: shouldUseMVCCRangeTombstonesForPointDeletes,
},
},
RangeFeed: &rangefeed.TestingKnobs{
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/sqlliveness/slstorage/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ go_library(
importpath = "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness/slstorage",
visibility = ["//visibility:public"],
deps = [
"//pkg/clusterversion",
"//pkg/keys",
"//pkg/kv",
"//pkg/kv/kvpb",
Expand Down
7 changes: 6 additions & 1 deletion pkg/sql/sqlliveness/slstorage/slstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"math/rand"
"time"

"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
Expand Down Expand Up @@ -529,7 +530,11 @@ func (s *Storage) Insert(

}
v := encodeValue(expiration)
batch.InitPut(k, &v, true)
if s.settings.Version.IsActive(ctx, clusterversion.V25_1) {
batch.CPutFailOnTombstones(k, &v, nil /* expValue */)
} else {
batch.InitPut(k, &v, true)
}

return txn.CommitInBatch(ctx, batch)
}); err != nil {
Expand Down
4 changes: 3 additions & 1 deletion pkg/storage/metamorphic/operations.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,9 @@ func (m mvccCPutOp) run(ctx context.Context) string {

_, err := storage.MVCCConditionalPut(ctx, writer, m.key,
txn.ReadTimestamp, m.value, m.expVal, storage.ConditionalPutWriteOptions{
MVCCWriteOptions: storage.MVCCWriteOptions{Txn: txn},
MVCCWriteOptions: storage.MVCCWriteOptions{Txn: txn},
// TODO: why we unconditionally set this flag?
// TODO: should we add FailOnTombstones?
AllowIfDoesNotExist: true,
})
if err != nil {
Expand Down
42 changes: 38 additions & 4 deletions pkg/storage/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -2883,12 +2883,25 @@ const (
CPutFailIfMissing CPutMissingBehavior = false
)

// CPutTombstoneBehavior describes the handling of a tombstone.
type CPutTombstoneBehavior bool

const (
// CPutIgnoreTombstones is used to indicate that CPut should ignore the
// tombstones and treat them as key non-existing.
CPutIgnoreTombstones CPutTombstoneBehavior = false
// CPutFailOnTombstones is used to indicate that CPut should fail if it
// finds a tombstone.
CPutFailOnTombstones CPutTombstoneBehavior = true
)

// ConditionalPutWriteOptions bundles options for the
// MVCCConditionalPut and MVCCBlindConditionalPut functions.
type ConditionalPutWriteOptions struct {
MVCCWriteOptions

AllowIfDoesNotExist CPutMissingBehavior
FailOnTombstones CPutTombstoneBehavior
// OriginTimestamp, if set, indicates that the caller wants to put the
// value only if any existing key is older than this timestamp.
//
Expand Down Expand Up @@ -3011,10 +3024,16 @@ func mvccConditionalPutUsingIter(
expBytes []byte,
opts ConditionalPutWriteOptions,
) (roachpb.LockAcquisition, error) {
if bool(opts.AllowIfDoesNotExist) && bool(opts.FailOnTombstones) {
return roachpb.LockAcquisition{}, errors.AssertionFailedf("AllowIfDoesNotExist and FailOnTombstones are incompatible")
}
if !opts.OriginTimestamp.IsEmpty() {
if bool(opts.AllowIfDoesNotExist) {
return roachpb.LockAcquisition{}, errors.AssertionFailedf("AllowIfDoesNotExist and non-zero OriginTimestamp are incompatible")
}
if bool(opts.FailOnTombstones) {
return roachpb.LockAcquisition{}, errors.AssertionFailedf("FailOnTombstones and non-zero OriginTimestamp are incompatible")
}
putIsInline := timestamp.IsEmpty()
if putIsInline {
return roachpb.LockAcquisition{}, errors.AssertionFailedf("inline put and non-zero OriginTimestamp are incompatible")
Expand All @@ -3023,11 +3042,26 @@ func mvccConditionalPutUsingIter(

var valueFn func(existVal optionalValue) (roachpb.Value, error)
if opts.OriginTimestamp.IsEmpty() {
valueFn = func(actualValue optionalValue) (roachpb.Value, error) {
if err := maybeConditionFailedError(expBytes, actualValue, bool(opts.AllowIfDoesNotExist)); err != nil {
return roachpb.Value{}, err
if opts.FailOnTombstones {
valueFn = func(existVal optionalValue) (roachpb.Value, error) {
if existVal.IsTombstone() {
// We found a tombstone and FailOnTombstones is true: fail.
return roachpb.Value{}, &kvpb.ConditionFailedError{
ActualValue: existVal.ToPointer(),
}
}
if err := maybeConditionFailedError(expBytes, existVal, false /* allowNoExisting */); err != nil {
return roachpb.Value{}, err
}
return value, nil
}
} else {
valueFn = func(existVal optionalValue) (roachpb.Value, error) {
if err := maybeConditionFailedError(expBytes, existVal, bool(opts.AllowIfDoesNotExist)); err != nil {
return roachpb.Value{}, err
}
return value, nil
}
return value, nil
}
} else {
valueFn = func(existVal optionalValue) (roachpb.Value, error) {
Expand Down
13 changes: 9 additions & 4 deletions pkg/storage/mvcc_history_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ var (
// check_for_acquire_lock t=<name> k=<key> str=<strength> [maxLockConflicts=<int>] [targetLockConflictBytes=<int>]
// acquire_lock t=<name> k=<key> str=<strength> [maxLockConflicts=<int>] [targetLockConflictBytes=<int>]
//
// cput [t=<name>] [ts=<int>[,<int>]] [localTs=<int>[,<int>]] [resolve [status=<txnstatus>]] [ambiguousReplay] [maxLockConflicts=<int>] [targetLockConflictBytes=<int>] k=<key> v=<string> [raw] [cond=<string>]
// cput [t=<name>] [ts=<int>[,<int>]] [localTs=<int>[,<int>]] [resolve [status=<txnstatus>]] [ambiguousReplay] [maxLockConflicts=<int>] [targetLockConflictBytes=<int>] k=<key> v=<string> [raw] [cond=<string>] [allow_missing] [failOnTombstones]
// del [t=<name>] [ts=<int>[,<int>]] [localTs=<int>[,<int>]] [resolve [status=<txnstatus>]] [ambiguousReplay] [maxLockConflicts=<int>] [targetLockConflictBytes=<int>] k=<key>
// del_range [t=<name>] [ts=<int>[,<int>]] [localTs=<int>[,<int>]] [resolve [status=<txnstatus>]] [ambiguousReplay] [maxLockConflicts=<int>] [targetLockConflictBytes=<int>] k=<key> end=<key> [max=<max>] [returnKeys]
// del_range_ts [ts=<int>[,<int>]] [localTs=<int>[,<int>]] [maxLockConflicts=<int>] [targetLockConflictBytes=<int>] k=<key> end=<key> [idempotent] [noCoveredStats]
Expand Down Expand Up @@ -1293,9 +1293,13 @@ func cmdCPut(e *evalCtx) error {
rexpVal := e.getValInternal("cond")
expVal = rexpVal.TagAndDataBytes()
}
behavior := storage.CPutFailIfMissing
missingBehavior := storage.CPutFailIfMissing
if e.hasArg("allow_missing") {
behavior = storage.CPutAllowIfMissing
missingBehavior = storage.CPutAllowIfMissing
}
tombstoneBehavior := storage.CPutIgnoreTombstones
if e.hasArg("failOnTombstones") {
tombstoneBehavior = storage.CPutFailOnTombstones
}

originTimestamp := hlc.Timestamp{}
Expand All @@ -1314,7 +1318,8 @@ func cmdCPut(e *evalCtx) error {
ReplayWriteTimestampProtection: e.getAmbiguousReplay(),
MaxLockConflicts: e.getMaxLockConflicts(),
},
AllowIfDoesNotExist: behavior,
AllowIfDoesNotExist: missingBehavior,
FailOnTombstones: tombstoneBehavior,
OriginTimestamp: originTimestamp,
}
acq, err := storage.MVCCConditionalPut(e.ctx, rw, key, ts, val, expVal, opts)
Expand Down
Loading

0 comments on commit ef02b98

Please sign in to comment.