From c07e737546379f234e232390d9529f03d3d28c74 Mon Sep 17 00:00:00 2001 From: Michael Butler Date: Wed, 17 Jul 2024 14:23:38 -0400 Subject: [PATCH] storage: add OriginTimestamp to MVCCValueHeader This patch adds a new OriginTimestamp field to the MVCCValueHeader which will identify the timestamp the kv was written on the original source cluster during Logical Data Replication. Note that the Timestamp is marshalled as a pointer to avoid the overhead of storing an empty Timestamp in the value header proto, a known limitation of gogoproto. In future patches, we will actually bind the origin timestamp here during LDR ingestion and use it to conduct large scale fingerprinting and LWW conflict resolution. Epic: none Release note: none --- pkg/storage/enginepb/mvcc3.proto | 14 +++++- pkg/storage/enginepb/mvcc3_test.go | 4 ++ pkg/storage/enginepb/mvcc3_valueheader.go | 18 ++++++- pkg/storage/mvcc.go | 4 ++ pkg/storage/mvcc_test.go | 25 ++++++++++ pkg/storage/mvcc_value.go | 3 ++ pkg/storage/mvcc_value_test.go | 47 +++++++++++++------ .../headerFull_bytes | 2 +- .../TestEncodeDecodeMVCCValue/headerFull_int | 2 +- .../headerFull_tombstone | 2 +- .../headerOriginTsOnly_bytes | 3 ++ .../headerOriginTsOnly_int | 3 ++ .../headerOriginTsOnly_tombstone | 3 ++ 13 files changed, 109 insertions(+), 21 deletions(-) create mode 100644 pkg/storage/testdata/TestEncodeDecodeMVCCValue/headerOriginTsOnly_bytes create mode 100644 pkg/storage/testdata/TestEncodeDecodeMVCCValue/headerOriginTsOnly_int create mode 100644 pkg/storage/testdata/TestEncodeDecodeMVCCValue/headerOriginTsOnly_tombstone diff --git a/pkg/storage/enginepb/mvcc3.proto b/pkg/storage/enginepb/mvcc3.proto index 63f6afaf1bf9..5984b4dc298b 100644 --- a/pkg/storage/enginepb/mvcc3.proto +++ b/pkg/storage/enginepb/mvcc3.proto @@ -213,8 +213,14 @@ message MVCCValueHeader { // when LDR is not being run, 1 identifies a remote write, and 2+ are reserved // to identify remote clusters. uint32 origin_id = 5 [(gogoproto.customname) = "OriginID"]; + + // OriginTimestamp identifies the timestamp this kv was written on the + // original source cluster during Logical Data Replication. A default empty + // timestamp implies that this kv did not originate from a Logical Data + // Replication stream. + util.hlc.Timestamp origin_timestamp = 6 [(gogoproto.nullable) = false,(gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/util/hlc.Timestamp"]; - // NextID = 6. + // NextID = 7. } // MVCCValueHeaderPure is not to be used directly. It's generated only for use of @@ -226,6 +232,11 @@ message MVCCValueHeaderPure { bool omit_in_rangefeeds = 3; uint32 import_epoch = 4; uint32 origin_id = 5 [(gogoproto.customname) = "OriginID"]; + + // OriginTimestamp is not-nullable in MVCCValueHeader but here it is nullable. + // This is because it leads to more efficient marshaling when the timestamp is + // not set. See the pure() conversion method. + util.hlc.Timestamp origin_timestamp = 6 [(gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/util/hlc.Timestamp"]; } // MVCCValueHeaderCrdbTest is not to be used directly. It's generated only for use of // its marshaling methods by MVCCValueHeader. See the comment there. @@ -240,6 +251,7 @@ message MVCCValueHeaderCrdbTest { bool omit_in_rangefeeds = 3; uint32 import_epoch = 4; uint32 origin_id = 5 [(gogoproto.customname) = "OriginID"]; + util.hlc.Timestamp origin_timestamp = 6 [(gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/util/hlc.Timestamp"]; } // MVCCStatsDelta is convertible to MVCCStats, but uses signed variable width diff --git a/pkg/storage/enginepb/mvcc3_test.go b/pkg/storage/enginepb/mvcc3_test.go index 31b0f19a1d75..a1f3ed3fa8e3 100644 --- a/pkg/storage/enginepb/mvcc3_test.go +++ b/pkg/storage/enginepb/mvcc3_test.go @@ -33,6 +33,7 @@ func populatedMVCCValueHeader() MVCCValueHeader { OmitInRangefeeds: true, ImportEpoch: 1, OriginID: 1, + OriginTimestamp: hlc.Timestamp{WallTime: 1, Logical: 1}, } allFieldsSet.KVNemesisSeq.Set(123) return allFieldsSet @@ -44,6 +45,7 @@ func defaultMVCCValueHeader() MVCCValueHeader { OmitInRangefeeds: false, ImportEpoch: 0, OriginID: 0, + OriginTimestamp: hlc.Timestamp{}, } } @@ -62,9 +64,11 @@ func TestMVCCValueHeader_IsEmpty(t *testing.T) { require.False(t, MVCCValueHeader{OmitInRangefeeds: allFieldsSet.OmitInRangefeeds}.IsEmpty()) require.False(t, MVCCValueHeader{ImportEpoch: allFieldsSet.ImportEpoch}.IsEmpty()) require.False(t, MVCCValueHeader{OriginID: allFieldsSet.OriginID}.IsEmpty()) + require.False(t, MVCCValueHeader{OriginTimestamp: allFieldsSet.OriginTimestamp}.IsEmpty()) } func TestMVCCValueHeader_MarshalUnmarshal(t *testing.T) { + // TODO: test this with random combinations of header fields set. vh := populatedMVCCValueHeader() b, err := protoutil.Marshal(&vh) require.NoError(t, err) diff --git a/pkg/storage/enginepb/mvcc3_valueheader.go b/pkg/storage/enginepb/mvcc3_valueheader.go index f09d61aa3224..47a1bb50c219 100644 --- a/pkg/storage/enginepb/mvcc3_valueheader.go +++ b/pkg/storage/enginepb/mvcc3_valueheader.go @@ -19,16 +19,30 @@ func (h MVCCValueHeader) IsEmpty() bool { } func (h *MVCCValueHeader) pure() MVCCValueHeaderPure { - return MVCCValueHeaderPure{ + result := MVCCValueHeaderPure{ LocalTimestamp: h.LocalTimestamp, OmitInRangefeeds: h.OmitInRangefeeds, ImportEpoch: h.ImportEpoch, OriginID: h.OriginID, } + if !h.OriginTimestamp.IsEmpty() { + result.OriginTimestamp = &h.OriginTimestamp + } + return result } func (h *MVCCValueHeader) crdbTest() MVCCValueHeaderCrdbTest { - return (MVCCValueHeaderCrdbTest)(*h) + result := MVCCValueHeaderCrdbTest{ + KVNemesisSeq: h.KVNemesisSeq, + LocalTimestamp: h.LocalTimestamp, + OmitInRangefeeds: h.OmitInRangefeeds, + ImportEpoch: h.ImportEpoch, + OriginID: h.OriginID, + } + if !h.OriginTimestamp.IsEmpty() { + result.OriginTimestamp = &h.OriginTimestamp + } + return result } // Size implements protoutil.Message. diff --git a/pkg/storage/mvcc.go b/pkg/storage/mvcc.go index 0fa7f7d76cab..d9253c18858b 100644 --- a/pkg/storage/mvcc.go +++ b/pkg/storage/mvcc.go @@ -2671,6 +2671,9 @@ func mvccPutInternal( versionValue.OmitInRangefeeds = opts.OmitInRangefeeds versionValue.ImportEpoch = opts.ImportEpoch versionValue.OriginID = opts.OriginID + if opts.OriginTimestamp.IsSet() { + versionValue.OriginTimestamp = opts.OriginTimestamp + } if buildutil.CrdbTestBuild { if seq, seqOK := kvnemesisutil.FromContext(ctx); seqOK { @@ -4622,6 +4625,7 @@ type MVCCWriteOptions struct { OmitInRangefeeds bool ImportEpoch uint32 OriginID uint32 + OriginTimestamp hlc.Timestamp // MaxLockConflicts is a maximum number of conflicting locks collected before // returning LockConflictError. Even single-key writes can encounter multiple // conflicting shared locks, so the limit is important to bound the number of diff --git a/pkg/storage/mvcc_test.go b/pkg/storage/mvcc_test.go index bbe325d9f37c..08e89c773030 100644 --- a/pkg/storage/mvcc_test.go +++ b/pkg/storage/mvcc_test.go @@ -390,6 +390,31 @@ func TestMVCCGetWithValueHeader(t *testing.T) { } } +func TestMVCCValueHeaderOriginTimestamp(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + engine := NewDefaultInMemForTesting() + defer engine.Close() + + // Put a value with a non-zero origin timestamp. + _, err := MVCCPut(ctx, engine, testKey1, hlc.Timestamp{WallTime: 1}, value1, MVCCWriteOptions{OriginTimestamp: hlc.Timestamp{WallTime: 1}}) + require.NoError(t, err) + + valueRes, vh, err := MVCCGetWithValueHeader(ctx, engine, testKey1, hlc.Timestamp{WallTime: 3}, MVCCGetOptions{}) + require.NoError(t, err) + require.NotNil(t, valueRes.Value) + require.Equal(t, hlc.Timestamp{WallTime: 1}, vh.OriginTimestamp) + + // Ensure a regular put has no origin timestamp. + _, err = MVCCPut(ctx, engine, testKey1, hlc.Timestamp{WallTime: 2}, value1, MVCCWriteOptions{}) + require.NoError(t, err) + valueRes, vh, err = MVCCGetWithValueHeader(ctx, engine, testKey1, hlc.Timestamp{WallTime: 3}, MVCCGetOptions{}) + require.NoError(t, err) + require.Zero(t, vh.OriginTimestamp) +} + // TestMVCCValueHeadersForRangefeeds tests that the value headers used by // rangefeeds are set correctly. func TestMVCCValueHeadersForRangefeeds(t *testing.T) { diff --git a/pkg/storage/mvcc_value.go b/pkg/storage/mvcc_value.go index 298f1146a8f6..a1b690b4bdd8 100644 --- a/pkg/storage/mvcc_value.go +++ b/pkg/storage/mvcc_value.go @@ -131,6 +131,9 @@ func (v MVCCValue) SafeFormat(w redact.SafePrinter, _ rune) { if v.OriginID != 0 { fields = append(fields, fmt.Sprintf("originID=%v", v.OriginID)) } + if v.OriginTimestamp.IsSet() { + fields = append(fields, fmt.Sprintf("originTs=%s", v.OriginTimestamp)) + } w.Print(strings.Join(fields, ", ")) w.Printf("}") } diff --git a/pkg/storage/mvcc_value_test.go b/pkg/storage/mvcc_value_test.go index 97f5d4ec2fe6..58faac5a82ab 100644 --- a/pkg/storage/mvcc_value_test.go +++ b/pkg/storage/mvcc_value_test.go @@ -13,6 +13,7 @@ package storage import ( "bytes" "fmt" + "math" "strings" "testing" @@ -122,6 +123,7 @@ func TestMVCCValueFormat(t *testing.T) { intVal.SetInt(17) var importEpoch uint32 = 3 var originID uint32 = 1 + var originTs = hlc.Timestamp{WallTime: 1, Logical: 1} valHeader := enginepb.MVCCValueHeader{} valHeader.LocalTimestamp = hlc.ClockTimestamp{WallTime: 9} @@ -129,30 +131,36 @@ func TestMVCCValueFormat(t *testing.T) { valHeaderFull := valHeader valHeaderFull.ImportEpoch = importEpoch valHeaderFull.OriginID = originID + valHeaderFull.OriginTimestamp = originTs valHeaderWithJobIDOnly := enginepb.MVCCValueHeader{ImportEpoch: importEpoch} valHeaderWithOriginIDOnly := enginepb.MVCCValueHeader{OriginID: originID} + valHeaderWithOriginTsOnly := enginepb.MVCCValueHeader{OriginTimestamp: originTs} + testcases := map[string]struct { val MVCCValue expect string }{ - "tombstone": {val: MVCCValue{}, expect: "/"}, - "bytes": {val: MVCCValue{Value: strVal}, expect: "/BYTES/foo"}, - "int": {val: MVCCValue{Value: intVal}, expect: "/INT/17"}, - "header+tombstone": {val: MVCCValue{MVCCValueHeader: valHeader}, expect: "{localTs=0.000000009,0}/"}, - "header+bytes": {val: MVCCValue{MVCCValueHeader: valHeader, Value: strVal}, expect: "{localTs=0.000000009,0}/BYTES/foo"}, - "header+int": {val: MVCCValue{MVCCValueHeader: valHeader, Value: intVal}, expect: "{localTs=0.000000009,0}/INT/17"}, - "headerJobIDOnly+tombstone": {val: MVCCValue{MVCCValueHeader: valHeaderWithJobIDOnly}, expect: "{importEpoch=3}/"}, - "headerJobIDOnly+bytes": {val: MVCCValue{MVCCValueHeader: valHeaderWithJobIDOnly, Value: strVal}, expect: "{importEpoch=3}/BYTES/foo"}, - "headerJobIDOnly+int": {val: MVCCValue{MVCCValueHeader: valHeaderWithJobIDOnly, Value: intVal}, expect: "{importEpoch=3}/INT/17"}, - "headerOriginIDOnly+tombstone": {val: MVCCValue{MVCCValueHeader: valHeaderWithOriginIDOnly}, expect: "{originID=1}/"}, - "headerOriginIDOnly+bytes": {val: MVCCValue{MVCCValueHeader: valHeaderWithOriginIDOnly, Value: strVal}, expect: "{originID=1}/BYTES/foo"}, - "headerOriginIDOnly+int": {val: MVCCValue{MVCCValueHeader: valHeaderWithOriginIDOnly, Value: intVal}, expect: "{originID=1}/INT/17"}, - "headerFull+tombstone": {val: MVCCValue{MVCCValueHeader: valHeaderFull}, expect: "{localTs=0.000000009,0, importEpoch=3, originID=1}/"}, - "headerFull+bytes": {val: MVCCValue{MVCCValueHeader: valHeaderFull, Value: strVal}, expect: "{localTs=0.000000009,0, importEpoch=3, originID=1}/BYTES/foo"}, - "headerFull+int": {val: MVCCValue{MVCCValueHeader: valHeaderFull, Value: intVal}, expect: "{localTs=0.000000009,0, importEpoch=3, originID=1}/INT/17"}, + "tombstone": {val: MVCCValue{}, expect: "/"}, + "bytes": {val: MVCCValue{Value: strVal}, expect: "/BYTES/foo"}, + "int": {val: MVCCValue{Value: intVal}, expect: "/INT/17"}, + "timestamp+tombstone": {val: MVCCValue{MVCCValueHeader: valHeader}, expect: "{localTs=0.000000009,0}/"}, + "timestamp+bytes": {val: MVCCValue{MVCCValueHeader: valHeader, Value: strVal}, expect: "{localTs=0.000000009,0}/BYTES/foo"}, + "timestamp+int": {val: MVCCValue{MVCCValueHeader: valHeader, Value: intVal}, expect: "{localTs=0.000000009,0}/INT/17"}, + "jobid+tombstone": {val: MVCCValue{MVCCValueHeader: valHeaderWithJobIDOnly}, expect: "{importEpoch=3}/"}, + "jobid+bytes": {val: MVCCValue{MVCCValueHeader: valHeaderWithJobIDOnly, Value: strVal}, expect: "{importEpoch=3}/BYTES/foo"}, + "jobid+int": {val: MVCCValue{MVCCValueHeader: valHeaderWithJobIDOnly, Value: intVal}, expect: "{importEpoch=3}/INT/17"}, + "originid+tombstone": {val: MVCCValue{MVCCValueHeader: valHeaderWithOriginIDOnly}, expect: "{originID=1}/"}, + "originid+bytes": {val: MVCCValue{MVCCValueHeader: valHeaderWithOriginIDOnly, Value: strVal}, expect: "{originID=1}/BYTES/foo"}, + "originid+int": {val: MVCCValue{MVCCValueHeader: valHeaderWithOriginIDOnly, Value: intVal}, expect: "{originID=1}/INT/17"}, + "fullheader+tombstone": {val: MVCCValue{MVCCValueHeader: valHeaderFull}, expect: "{localTs=0.000000009,0, importEpoch=3, originID=1, originTs=0.000000001,1}/"}, + "fullheader+bytes": {val: MVCCValue{MVCCValueHeader: valHeaderFull, Value: strVal}, expect: "{localTs=0.000000009,0, importEpoch=3, originID=1, originTs=0.000000001,1}/BYTES/foo"}, + "fullheader+int": {val: MVCCValue{MVCCValueHeader: valHeaderFull, Value: intVal}, expect: "{localTs=0.000000009,0, importEpoch=3, originID=1, originTs=0.000000001,1}/INT/17"}, + "origints+tombstone": {val: MVCCValue{MVCCValueHeader: valHeaderWithOriginTsOnly}, expect: "{originTs=0.000000001,1}/"}, + "origints+bytes": {val: MVCCValue{MVCCValueHeader: valHeaderWithOriginTsOnly, Value: strVal}, expect: "{originTs=0.000000001,1}/BYTES/foo"}, + "origints+int": {val: MVCCValue{MVCCValueHeader: valHeaderWithOriginTsOnly, Value: intVal}, expect: "{originTs=0.000000001,1}/INT/17"}, } for name, tc := range testcases { t.Run(name, func(t *testing.T) { @@ -163,6 +171,9 @@ func TestMVCCValueFormat(t *testing.T) { func TestEncodeDecodeMVCCValue(t *testing.T) { defer leaktest.AfterTest(t)() + + // Force the production fast path by deactivating the + // `disableSimpleValueEncoding` test constant. DisableMetamorphicSimpleValueEncoding(t) var strVal, intVal roachpb.Value @@ -170,6 +181,7 @@ func TestEncodeDecodeMVCCValue(t *testing.T) { intVal.SetInt(17) var importEpoch uint32 = 3 var originID uint32 = 1 + var originTs = hlc.Timestamp{WallTime: math.MaxInt64, Logical: 1} valHeader := enginepb.MVCCValueHeader{} valHeader.LocalTimestamp = hlc.ClockTimestamp{WallTime: 9} @@ -177,9 +189,11 @@ func TestEncodeDecodeMVCCValue(t *testing.T) { valHeaderFull := valHeader valHeaderFull.ImportEpoch = importEpoch valHeaderFull.OriginID = originID + valHeaderFull.OriginTimestamp = originTs valHeaderWithJobIDOnly := enginepb.MVCCValueHeader{ImportEpoch: importEpoch} valHeaderWithOriginIDOnly := enginepb.MVCCValueHeader{OriginID: originID} + valHeaderWithOriginTsOnly := enginepb.MVCCValueHeader{OriginTimestamp: originTs} testcases := map[string]struct { val MVCCValue @@ -199,6 +213,9 @@ func TestEncodeDecodeMVCCValue(t *testing.T) { "headerFull+tombstone": {val: MVCCValue{MVCCValueHeader: valHeaderFull}}, "headerFull+bytes": {val: MVCCValue{MVCCValueHeader: valHeaderFull, Value: strVal}}, "headerFull+int": {val: MVCCValue{MVCCValueHeader: valHeaderFull, Value: intVal}}, + "headerOriginTsOnly+tombstone": {val: MVCCValue{MVCCValueHeader: valHeaderWithOriginTsOnly}}, + "headerOriginTsOnly+bytes": {val: MVCCValue{MVCCValueHeader: valHeaderWithOriginTsOnly, Value: strVal}}, + "headerOriginTsOnly+int": {val: MVCCValue{MVCCValueHeader: valHeaderWithOriginTsOnly, Value: intVal}}, } w := echotest.NewWalker(t, datapathutils.TestDataPath(t, t.Name())) for name, tc := range testcases { diff --git a/pkg/storage/testdata/TestEncodeDecodeMVCCValue/headerFull_bytes b/pkg/storage/testdata/TestEncodeDecodeMVCCValue/headerFull_bytes index f942c3ef7837..9f7917db2019 100644 --- a/pkg/storage/testdata/TestEncodeDecodeMVCCValue/headerFull_bytes +++ b/pkg/storage/testdata/TestEncodeDecodeMVCCValue/headerFull_bytes @@ -1,3 +1,3 @@ echo ---- -encoded: 00000008650a020809200328010000000003666f6f +encoded: 00000016650a02080920032801320c08ffffffffffffffff7f10010000000003666f6f diff --git a/pkg/storage/testdata/TestEncodeDecodeMVCCValue/headerFull_int b/pkg/storage/testdata/TestEncodeDecodeMVCCValue/headerFull_int index bac5aab290a1..cc95a1ce2acc 100644 --- a/pkg/storage/testdata/TestEncodeDecodeMVCCValue/headerFull_int +++ b/pkg/storage/testdata/TestEncodeDecodeMVCCValue/headerFull_int @@ -1,3 +1,3 @@ echo ---- -encoded: 00000008650a02080920032801000000000122 +encoded: 00000016650a02080920032801320c08ffffffffffffffff7f1001000000000122 diff --git a/pkg/storage/testdata/TestEncodeDecodeMVCCValue/headerFull_tombstone b/pkg/storage/testdata/TestEncodeDecodeMVCCValue/headerFull_tombstone index 65f57cf5097b..c9e9ac9f4c7b 100644 --- a/pkg/storage/testdata/TestEncodeDecodeMVCCValue/headerFull_tombstone +++ b/pkg/storage/testdata/TestEncodeDecodeMVCCValue/headerFull_tombstone @@ -1,3 +1,3 @@ echo ---- -encoded: 00000008650a02080920032801 +encoded: 00000016650a02080920032801320c08ffffffffffffffff7f1001 diff --git a/pkg/storage/testdata/TestEncodeDecodeMVCCValue/headerOriginTsOnly_bytes b/pkg/storage/testdata/TestEncodeDecodeMVCCValue/headerOriginTsOnly_bytes new file mode 100644 index 000000000000..f25e945494a4 --- /dev/null +++ b/pkg/storage/testdata/TestEncodeDecodeMVCCValue/headerOriginTsOnly_bytes @@ -0,0 +1,3 @@ +echo +---- +encoded: 00000010650a00320c08ffffffffffffffff7f10010000000003666f6f diff --git a/pkg/storage/testdata/TestEncodeDecodeMVCCValue/headerOriginTsOnly_int b/pkg/storage/testdata/TestEncodeDecodeMVCCValue/headerOriginTsOnly_int new file mode 100644 index 000000000000..31e1af460005 --- /dev/null +++ b/pkg/storage/testdata/TestEncodeDecodeMVCCValue/headerOriginTsOnly_int @@ -0,0 +1,3 @@ +echo +---- +encoded: 00000010650a00320c08ffffffffffffffff7f1001000000000122 diff --git a/pkg/storage/testdata/TestEncodeDecodeMVCCValue/headerOriginTsOnly_tombstone b/pkg/storage/testdata/TestEncodeDecodeMVCCValue/headerOriginTsOnly_tombstone new file mode 100644 index 000000000000..9e6cee4550a8 --- /dev/null +++ b/pkg/storage/testdata/TestEncodeDecodeMVCCValue/headerOriginTsOnly_tombstone @@ -0,0 +1,3 @@ +echo +---- +encoded: 00000010650a00320c08ffffffffffffffff7f1001