Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
127668: storage: add OriginTimestamp to MVCCValueHeader r=raduberinde a=msbutler

This patch adds a new OriginTimestamp field to the MVCCValueHeader which will identify the timestamp the kv was written on the original source cluster during Logical Data Replication.

Note that the Timestamp is marshalled as a pointer to avoid the overhead of storing an empty Timestamp in the value header proto, a known limitation of gogoproto.

In future patches, we will actually bind the origin timestamp here during LDR ingestion and use it to conduct large scale fingerprinting and LWW conflict resolution.

Epic: none

Release note: none

Co-authored-by: Michael Butler <[email protected]>
  • Loading branch information
craig[bot] and msbutler committed Jul 26, 2024
2 parents 7fb362d + c07e737 commit afb9fde
Show file tree
Hide file tree
Showing 13 changed files with 109 additions and 21 deletions.
14 changes: 13 additions & 1 deletion pkg/storage/enginepb/mvcc3.proto
Original file line number Diff line number Diff line change
Expand Up @@ -213,8 +213,14 @@ message MVCCValueHeader {
// when LDR is not being run, 1 identifies a remote write, and 2+ are reserved
// to identify remote clusters.
uint32 origin_id = 5 [(gogoproto.customname) = "OriginID"];

// OriginTimestamp identifies the timestamp this kv was written on the
// original source cluster during Logical Data Replication. A default empty
// timestamp implies that this kv did not originate from a Logical Data
// Replication stream.
util.hlc.Timestamp origin_timestamp = 6 [(gogoproto.nullable) = false,(gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/util/hlc.Timestamp"];

// NextID = 6.
// NextID = 7.
}

// MVCCValueHeaderPure is not to be used directly. It's generated only for use of
Expand All @@ -226,6 +232,11 @@ message MVCCValueHeaderPure {
bool omit_in_rangefeeds = 3;
uint32 import_epoch = 4;
uint32 origin_id = 5 [(gogoproto.customname) = "OriginID"];

// OriginTimestamp is not-nullable in MVCCValueHeader but here it is nullable.
// This is because it leads to more efficient marshaling when the timestamp is
// not set. See the pure() conversion method.
util.hlc.Timestamp origin_timestamp = 6 [(gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/util/hlc.Timestamp"];
}
// MVCCValueHeaderCrdbTest is not to be used directly. It's generated only for use of
// its marshaling methods by MVCCValueHeader. See the comment there.
Expand All @@ -240,6 +251,7 @@ message MVCCValueHeaderCrdbTest {
bool omit_in_rangefeeds = 3;
uint32 import_epoch = 4;
uint32 origin_id = 5 [(gogoproto.customname) = "OriginID"];
util.hlc.Timestamp origin_timestamp = 6 [(gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/util/hlc.Timestamp"];
}

// MVCCStatsDelta is convertible to MVCCStats, but uses signed variable width
Expand Down
4 changes: 4 additions & 0 deletions pkg/storage/enginepb/mvcc3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ func populatedMVCCValueHeader() MVCCValueHeader {
OmitInRangefeeds: true,
ImportEpoch: 1,
OriginID: 1,
OriginTimestamp: hlc.Timestamp{WallTime: 1, Logical: 1},
}
allFieldsSet.KVNemesisSeq.Set(123)
return allFieldsSet
Expand All @@ -44,6 +45,7 @@ func defaultMVCCValueHeader() MVCCValueHeader {
OmitInRangefeeds: false,
ImportEpoch: 0,
OriginID: 0,
OriginTimestamp: hlc.Timestamp{},
}
}

Expand All @@ -62,9 +64,11 @@ func TestMVCCValueHeader_IsEmpty(t *testing.T) {
require.False(t, MVCCValueHeader{OmitInRangefeeds: allFieldsSet.OmitInRangefeeds}.IsEmpty())
require.False(t, MVCCValueHeader{ImportEpoch: allFieldsSet.ImportEpoch}.IsEmpty())
require.False(t, MVCCValueHeader{OriginID: allFieldsSet.OriginID}.IsEmpty())
require.False(t, MVCCValueHeader{OriginTimestamp: allFieldsSet.OriginTimestamp}.IsEmpty())
}

func TestMVCCValueHeader_MarshalUnmarshal(t *testing.T) {
// TODO: test this with random combinations of header fields set.
vh := populatedMVCCValueHeader()
b, err := protoutil.Marshal(&vh)
require.NoError(t, err)
Expand Down
18 changes: 16 additions & 2 deletions pkg/storage/enginepb/mvcc3_valueheader.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,30 @@ func (h MVCCValueHeader) IsEmpty() bool {
}

func (h *MVCCValueHeader) pure() MVCCValueHeaderPure {
return MVCCValueHeaderPure{
result := MVCCValueHeaderPure{
LocalTimestamp: h.LocalTimestamp,
OmitInRangefeeds: h.OmitInRangefeeds,
ImportEpoch: h.ImportEpoch,
OriginID: h.OriginID,
}
if !h.OriginTimestamp.IsEmpty() {
result.OriginTimestamp = &h.OriginTimestamp
}
return result
}

func (h *MVCCValueHeader) crdbTest() MVCCValueHeaderCrdbTest {
return (MVCCValueHeaderCrdbTest)(*h)
result := MVCCValueHeaderCrdbTest{
KVNemesisSeq: h.KVNemesisSeq,
LocalTimestamp: h.LocalTimestamp,
OmitInRangefeeds: h.OmitInRangefeeds,
ImportEpoch: h.ImportEpoch,
OriginID: h.OriginID,
}
if !h.OriginTimestamp.IsEmpty() {
result.OriginTimestamp = &h.OriginTimestamp
}
return result
}

// Size implements protoutil.Message.
Expand Down
4 changes: 4 additions & 0 deletions pkg/storage/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -2671,6 +2671,9 @@ func mvccPutInternal(
versionValue.OmitInRangefeeds = opts.OmitInRangefeeds
versionValue.ImportEpoch = opts.ImportEpoch
versionValue.OriginID = opts.OriginID
if opts.OriginTimestamp.IsSet() {
versionValue.OriginTimestamp = opts.OriginTimestamp
}

if buildutil.CrdbTestBuild {
if seq, seqOK := kvnemesisutil.FromContext(ctx); seqOK {
Expand Down Expand Up @@ -4622,6 +4625,7 @@ type MVCCWriteOptions struct {
OmitInRangefeeds bool
ImportEpoch uint32
OriginID uint32
OriginTimestamp hlc.Timestamp
// MaxLockConflicts is a maximum number of conflicting locks collected before
// returning LockConflictError. Even single-key writes can encounter multiple
// conflicting shared locks, so the limit is important to bound the number of
Expand Down
25 changes: 25 additions & 0 deletions pkg/storage/mvcc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,31 @@ func TestMVCCGetWithValueHeader(t *testing.T) {
}
}

func TestMVCCValueHeaderOriginTimestamp(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
engine := NewDefaultInMemForTesting()
defer engine.Close()

// Put a value with a non-zero origin timestamp.
_, err := MVCCPut(ctx, engine, testKey1, hlc.Timestamp{WallTime: 1}, value1, MVCCWriteOptions{OriginTimestamp: hlc.Timestamp{WallTime: 1}})
require.NoError(t, err)

valueRes, vh, err := MVCCGetWithValueHeader(ctx, engine, testKey1, hlc.Timestamp{WallTime: 3}, MVCCGetOptions{})
require.NoError(t, err)
require.NotNil(t, valueRes.Value)
require.Equal(t, hlc.Timestamp{WallTime: 1}, vh.OriginTimestamp)

// Ensure a regular put has no origin timestamp.
_, err = MVCCPut(ctx, engine, testKey1, hlc.Timestamp{WallTime: 2}, value1, MVCCWriteOptions{})
require.NoError(t, err)
valueRes, vh, err = MVCCGetWithValueHeader(ctx, engine, testKey1, hlc.Timestamp{WallTime: 3}, MVCCGetOptions{})
require.NoError(t, err)
require.Zero(t, vh.OriginTimestamp)
}

// TestMVCCValueHeadersForRangefeeds tests that the value headers used by
// rangefeeds are set correctly.
func TestMVCCValueHeadersForRangefeeds(t *testing.T) {
Expand Down
3 changes: 3 additions & 0 deletions pkg/storage/mvcc_value.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,9 @@ func (v MVCCValue) SafeFormat(w redact.SafePrinter, _ rune) {
if v.OriginID != 0 {
fields = append(fields, fmt.Sprintf("originID=%v", v.OriginID))
}
if v.OriginTimestamp.IsSet() {
fields = append(fields, fmt.Sprintf("originTs=%s", v.OriginTimestamp))
}
w.Print(strings.Join(fields, ", "))
w.Printf("}")
}
Expand Down
47 changes: 32 additions & 15 deletions pkg/storage/mvcc_value_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ package storage
import (
"bytes"
"fmt"
"math"
"strings"
"testing"

Expand Down Expand Up @@ -122,37 +123,44 @@ func TestMVCCValueFormat(t *testing.T) {
intVal.SetInt(17)
var importEpoch uint32 = 3
var originID uint32 = 1
var originTs = hlc.Timestamp{WallTime: 1, Logical: 1}

valHeader := enginepb.MVCCValueHeader{}
valHeader.LocalTimestamp = hlc.ClockTimestamp{WallTime: 9}

valHeaderFull := valHeader
valHeaderFull.ImportEpoch = importEpoch
valHeaderFull.OriginID = originID
valHeaderFull.OriginTimestamp = originTs

valHeaderWithJobIDOnly := enginepb.MVCCValueHeader{ImportEpoch: importEpoch}

valHeaderWithOriginIDOnly := enginepb.MVCCValueHeader{OriginID: originID}

valHeaderWithOriginTsOnly := enginepb.MVCCValueHeader{OriginTimestamp: originTs}

testcases := map[string]struct {
val MVCCValue
expect string
}{
"tombstone": {val: MVCCValue{}, expect: "/<empty>"},
"bytes": {val: MVCCValue{Value: strVal}, expect: "/BYTES/foo"},
"int": {val: MVCCValue{Value: intVal}, expect: "/INT/17"},
"header+tombstone": {val: MVCCValue{MVCCValueHeader: valHeader}, expect: "{localTs=0.000000009,0}/<empty>"},
"header+bytes": {val: MVCCValue{MVCCValueHeader: valHeader, Value: strVal}, expect: "{localTs=0.000000009,0}/BYTES/foo"},
"header+int": {val: MVCCValue{MVCCValueHeader: valHeader, Value: intVal}, expect: "{localTs=0.000000009,0}/INT/17"},
"headerJobIDOnly+tombstone": {val: MVCCValue{MVCCValueHeader: valHeaderWithJobIDOnly}, expect: "{importEpoch=3}/<empty>"},
"headerJobIDOnly+bytes": {val: MVCCValue{MVCCValueHeader: valHeaderWithJobIDOnly, Value: strVal}, expect: "{importEpoch=3}/BYTES/foo"},
"headerJobIDOnly+int": {val: MVCCValue{MVCCValueHeader: valHeaderWithJobIDOnly, Value: intVal}, expect: "{importEpoch=3}/INT/17"},
"headerOriginIDOnly+tombstone": {val: MVCCValue{MVCCValueHeader: valHeaderWithOriginIDOnly}, expect: "{originID=1}/<empty>"},
"headerOriginIDOnly+bytes": {val: MVCCValue{MVCCValueHeader: valHeaderWithOriginIDOnly, Value: strVal}, expect: "{originID=1}/BYTES/foo"},
"headerOriginIDOnly+int": {val: MVCCValue{MVCCValueHeader: valHeaderWithOriginIDOnly, Value: intVal}, expect: "{originID=1}/INT/17"},
"headerFull+tombstone": {val: MVCCValue{MVCCValueHeader: valHeaderFull}, expect: "{localTs=0.000000009,0, importEpoch=3, originID=1}/<empty>"},
"headerFull+bytes": {val: MVCCValue{MVCCValueHeader: valHeaderFull, Value: strVal}, expect: "{localTs=0.000000009,0, importEpoch=3, originID=1}/BYTES/foo"},
"headerFull+int": {val: MVCCValue{MVCCValueHeader: valHeaderFull, Value: intVal}, expect: "{localTs=0.000000009,0, importEpoch=3, originID=1}/INT/17"},
"tombstone": {val: MVCCValue{}, expect: "/<empty>"},
"bytes": {val: MVCCValue{Value: strVal}, expect: "/BYTES/foo"},
"int": {val: MVCCValue{Value: intVal}, expect: "/INT/17"},
"timestamp+tombstone": {val: MVCCValue{MVCCValueHeader: valHeader}, expect: "{localTs=0.000000009,0}/<empty>"},
"timestamp+bytes": {val: MVCCValue{MVCCValueHeader: valHeader, Value: strVal}, expect: "{localTs=0.000000009,0}/BYTES/foo"},
"timestamp+int": {val: MVCCValue{MVCCValueHeader: valHeader, Value: intVal}, expect: "{localTs=0.000000009,0}/INT/17"},
"jobid+tombstone": {val: MVCCValue{MVCCValueHeader: valHeaderWithJobIDOnly}, expect: "{importEpoch=3}/<empty>"},
"jobid+bytes": {val: MVCCValue{MVCCValueHeader: valHeaderWithJobIDOnly, Value: strVal}, expect: "{importEpoch=3}/BYTES/foo"},
"jobid+int": {val: MVCCValue{MVCCValueHeader: valHeaderWithJobIDOnly, Value: intVal}, expect: "{importEpoch=3}/INT/17"},
"originid+tombstone": {val: MVCCValue{MVCCValueHeader: valHeaderWithOriginIDOnly}, expect: "{originID=1}/<empty>"},
"originid+bytes": {val: MVCCValue{MVCCValueHeader: valHeaderWithOriginIDOnly, Value: strVal}, expect: "{originID=1}/BYTES/foo"},
"originid+int": {val: MVCCValue{MVCCValueHeader: valHeaderWithOriginIDOnly, Value: intVal}, expect: "{originID=1}/INT/17"},
"fullheader+tombstone": {val: MVCCValue{MVCCValueHeader: valHeaderFull}, expect: "{localTs=0.000000009,0, importEpoch=3, originID=1, originTs=0.000000001,1}/<empty>"},
"fullheader+bytes": {val: MVCCValue{MVCCValueHeader: valHeaderFull, Value: strVal}, expect: "{localTs=0.000000009,0, importEpoch=3, originID=1, originTs=0.000000001,1}/BYTES/foo"},
"fullheader+int": {val: MVCCValue{MVCCValueHeader: valHeaderFull, Value: intVal}, expect: "{localTs=0.000000009,0, importEpoch=3, originID=1, originTs=0.000000001,1}/INT/17"},
"origints+tombstone": {val: MVCCValue{MVCCValueHeader: valHeaderWithOriginTsOnly}, expect: "{originTs=0.000000001,1}/<empty>"},
"origints+bytes": {val: MVCCValue{MVCCValueHeader: valHeaderWithOriginTsOnly, Value: strVal}, expect: "{originTs=0.000000001,1}/BYTES/foo"},
"origints+int": {val: MVCCValue{MVCCValueHeader: valHeaderWithOriginTsOnly, Value: intVal}, expect: "{originTs=0.000000001,1}/INT/17"},
}
for name, tc := range testcases {
t.Run(name, func(t *testing.T) {
Expand All @@ -163,23 +171,29 @@ func TestMVCCValueFormat(t *testing.T) {

func TestEncodeDecodeMVCCValue(t *testing.T) {
defer leaktest.AfterTest(t)()

// Force the production fast path by deactivating the
// `disableSimpleValueEncoding` test constant.
DisableMetamorphicSimpleValueEncoding(t)

var strVal, intVal roachpb.Value
strVal.SetString("foo")
intVal.SetInt(17)
var importEpoch uint32 = 3
var originID uint32 = 1
var originTs = hlc.Timestamp{WallTime: math.MaxInt64, Logical: 1}

valHeader := enginepb.MVCCValueHeader{}
valHeader.LocalTimestamp = hlc.ClockTimestamp{WallTime: 9}

valHeaderFull := valHeader
valHeaderFull.ImportEpoch = importEpoch
valHeaderFull.OriginID = originID
valHeaderFull.OriginTimestamp = originTs

valHeaderWithJobIDOnly := enginepb.MVCCValueHeader{ImportEpoch: importEpoch}
valHeaderWithOriginIDOnly := enginepb.MVCCValueHeader{OriginID: originID}
valHeaderWithOriginTsOnly := enginepb.MVCCValueHeader{OriginTimestamp: originTs}

testcases := map[string]struct {
val MVCCValue
Expand All @@ -199,6 +213,9 @@ func TestEncodeDecodeMVCCValue(t *testing.T) {
"headerFull+tombstone": {val: MVCCValue{MVCCValueHeader: valHeaderFull}},
"headerFull+bytes": {val: MVCCValue{MVCCValueHeader: valHeaderFull, Value: strVal}},
"headerFull+int": {val: MVCCValue{MVCCValueHeader: valHeaderFull, Value: intVal}},
"headerOriginTsOnly+tombstone": {val: MVCCValue{MVCCValueHeader: valHeaderWithOriginTsOnly}},
"headerOriginTsOnly+bytes": {val: MVCCValue{MVCCValueHeader: valHeaderWithOriginTsOnly, Value: strVal}},
"headerOriginTsOnly+int": {val: MVCCValue{MVCCValueHeader: valHeaderWithOriginTsOnly, Value: intVal}},
}
w := echotest.NewWalker(t, datapathutils.TestDataPath(t, t.Name()))
for name, tc := range testcases {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
echo
----
encoded: 00000008650a020809200328010000000003666f6f
encoded: 00000016650a02080920032801320c08ffffffffffffffff7f10010000000003666f6f
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
echo
----
encoded: 00000008650a02080920032801000000000122
encoded: 00000016650a02080920032801320c08ffffffffffffffff7f1001000000000122
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
echo
----
encoded: 00000008650a02080920032801
encoded: 00000016650a02080920032801320c08ffffffffffffffff7f1001
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
echo
----
encoded: 00000010650a00320c08ffffffffffffffff7f10010000000003666f6f
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
echo
----
encoded: 00000010650a00320c08ffffffffffffffff7f1001000000000122
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
echo
----
encoded: 00000010650a00320c08ffffffffffffffff7f1001

0 comments on commit afb9fde

Please sign in to comment.