diff --git a/pkg/kv/kvserver/rangefeed/registry_helper_test.go b/pkg/kv/kvserver/rangefeed/registry_helper_test.go index ba56b139a42e..a4b4040ae7ac 100644 --- a/pkg/kv/kvserver/rangefeed/registry_helper_test.go +++ b/pkg/kv/kvserver/rangefeed/registry_helper_test.go @@ -15,9 +15,11 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvpb" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage" + "github.com/cockroachdb/cockroach/pkg/storage/enginepb" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/syncutil" + "github.com/cockroachdb/cockroach/pkg/util/uuid" ) var ( @@ -32,6 +34,112 @@ var ( spXY = roachpb.Span{Key: keyX, EndKey: keyY} ) +var txn1, txn2 = uuid.MakeV4(), uuid.MakeV4() + +var keyValues = []storage.MVCCKeyValue{ + makeKV("a", "valA1", 10), + makeIntent("c", txn1, "txnKeyC", 15), + makeProvisionalKV("c", "txnKeyC", 15), + makeKV("c", "valC2", 11), + makeKV("c", "valC1", 9), + makeIntent("d", txn2, "txnKeyD", 21), + makeProvisionalKV("d", "txnKeyD", 21), + makeKV("d", "valD5", 20), + makeKV("d", "valD4", 19), + makeKV("d", "valD3", 16), + makeKV("d", "valD2", 3), + makeKV("d", "valD1", 1), + makeKV("e", "valE3", 6), + makeKV("e", "valE2", 5), + makeKV("e", "valE1", 4), + makeKV("f", "valF3", 7), + makeKV("f", "valF2", 6), + makeKV("f", "valF1", 5), + makeKV("h", "valH1", 15), + makeKV("m", "valM1", 1), + makeIntent("n", txn1, "txnKeyN", 12), + makeProvisionalKV("n", "txnKeyN", 12), + makeIntent("r", txn1, "txnKeyR", 19), + makeProvisionalKV("r", "txnKeyR", 19), + makeKV("r", "valR1", 4), + makeKV("s", "valS3", 21), + makeKVWithHeader("s", "valS2", 20, enginepb.MVCCValueHeader{OmitInRangefeeds: true}), + makeKV("s", "valS1", 19), + makeIntent("w", txn1, "txnKeyW", 3), + makeProvisionalKV("w", "txnKeyW", 3), + makeIntent("z", txn2, "txnKeyZ", 21), + makeProvisionalKV("z", "txnKeyZ", 21), + makeKV("z", "valZ1", 4), +} + +func expEvents(filtering bool) []*kvpb.RangeFeedEvent { + expEvents := []*kvpb.RangeFeedEvent{ + rangeFeedValueWithPrev( + roachpb.Key("d"), + makeValWithTs("valD3", 16), + makeVal("valD2"), + ), + rangeFeedValueWithPrev( + roachpb.Key("d"), + makeValWithTs("valD4", 19), + makeVal("valD3"), + ), + rangeFeedValueWithPrev( + roachpb.Key("d"), + makeValWithTs("valD5", 20), + makeVal("valD4"), + ), + rangeFeedValueWithPrev( + roachpb.Key("e"), + makeValWithTs("valE2", 5), + makeVal("valE1"), + ), + rangeFeedValueWithPrev( + roachpb.Key("e"), + makeValWithTs("valE3", 6), + makeVal("valE2"), + ), + rangeFeedValue( + roachpb.Key("f"), + makeValWithTs("valF1", 5), + ), + rangeFeedValueWithPrev( + roachpb.Key("f"), + makeValWithTs("valF2", 6), + makeVal("valF1"), + ), + rangeFeedValueWithPrev( + roachpb.Key("f"), + makeValWithTs("valF3", 7), + makeVal("valF2"), + ), + rangeFeedValue( + roachpb.Key("h"), + makeValWithTs("valH1", 15), + ), + rangeFeedValue( + roachpb.Key("s"), + makeValWithTs("valS1", 19), + ), + } + if !filtering { + expEvents = append(expEvents, + rangeFeedValueWithPrev( + roachpb.Key("s"), + makeValWithTs("valS2", 20), + makeVal("valS1"), + )) + } + expEvents = append(expEvents, rangeFeedValueWithPrev( + roachpb.Key("s"), + makeValWithTs("valS3", 21), + // Even though the event that wrote val2 is filtered out, we want to keep + // val2 as a previous value of the next event. + makeVal("valS2"), + )) + return expEvents +} + type testStream struct { ctx context.Context ctxDone func() diff --git a/pkg/kv/kvserver/rangefeed/registry_test.go b/pkg/kv/kvserver/rangefeed/registry_test.go index 1aa72b1a1184..322d0c420562 100644 --- a/pkg/kv/kvserver/rangefeed/registry_test.go +++ b/pkg/kv/kvserver/rangefeed/registry_test.go @@ -14,11 +14,9 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvpb" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage" - "github.com/cockroachdb/cockroach/pkg/storage/enginepb" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" - "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/stretchr/testify/require" ) @@ -118,43 +116,7 @@ func TestRegistrationCatchUpScan(t *testing.T) { testutils.RunTrueAndFalse(t, "withFiltering", func(t *testing.T, withFiltering bool) { // Run a catch-up scan for a registration over a test // iterator with the following keys. - txn1, txn2 := uuid.MakeV4(), uuid.MakeV4() - iter := newTestIterator([]storage.MVCCKeyValue{ - makeKV("a", "valA1", 10), - makeIntent("c", txn1, "txnKeyC", 15), - makeProvisionalKV("c", "txnKeyC", 15), - makeKV("c", "valC2", 11), - makeKV("c", "valC1", 9), - makeIntent("d", txn2, "txnKeyD", 21), - makeProvisionalKV("d", "txnKeyD", 21), - makeKV("d", "valD5", 20), - makeKV("d", "valD4", 19), - makeKV("d", "valD3", 16), - makeKV("d", "valD2", 3), - makeKV("d", "valD1", 1), - makeKV("e", "valE3", 6), - makeKV("e", "valE2", 5), - makeKV("e", "valE1", 4), - makeKV("f", "valF3", 7), - makeKV("f", "valF2", 6), - makeKV("f", "valF1", 5), - makeKV("h", "valH1", 15), - makeKV("m", "valM1", 1), - makeIntent("n", txn1, "txnKeyN", 12), - makeProvisionalKV("n", "txnKeyN", 12), - makeIntent("r", txn1, "txnKeyR", 19), - makeProvisionalKV("r", "txnKeyR", 19), - makeKV("r", "valR1", 4), - makeKV("s", "valS3", 21), - makeKVWithHeader("s", "valS2", 20, enginepb.MVCCValueHeader{OmitInRangefeeds: true}), - makeKV("s", "valS1", 19), - makeIntent("w", txn1, "txnKeyW", 3), - makeProvisionalKV("w", "txnKeyW", 3), - makeIntent("z", txn2, "txnKeyZ", 21), - makeProvisionalKV("z", "txnKeyZ", 21), - makeKV("z", "valZ1", 4), - }, roachpb.Key("w")) - + iter := newTestIterator(keyValues, roachpb.Key("w")) r := newTestRegistration(roachpb.Span{ Key: roachpb.Key("d"), EndKey: roachpb.Key("w"), @@ -166,71 +128,7 @@ func TestRegistrationCatchUpScan(t *testing.T) { require.NotZero(t, r.metrics.RangeFeedCatchUpScanNanos.Count()) // Compare the events sent on the registration's Stream to the expected events. - expEvents := []*kvpb.RangeFeedEvent{ - rangeFeedValueWithPrev( - roachpb.Key("d"), - makeValWithTs("valD3", 16), - makeVal("valD2"), - ), - rangeFeedValueWithPrev( - roachpb.Key("d"), - makeValWithTs("valD4", 19), - makeVal("valD3"), - ), - rangeFeedValueWithPrev( - roachpb.Key("d"), - makeValWithTs("valD5", 20), - makeVal("valD4"), - ), - rangeFeedValueWithPrev( - roachpb.Key("e"), - makeValWithTs("valE2", 5), - makeVal("valE1"), - ), - rangeFeedValueWithPrev( - roachpb.Key("e"), - makeValWithTs("valE3", 6), - makeVal("valE2"), - ), - rangeFeedValue( - roachpb.Key("f"), - makeValWithTs("valF1", 5), - ), - rangeFeedValueWithPrev( - roachpb.Key("f"), - makeValWithTs("valF2", 6), - makeVal("valF1"), - ), - rangeFeedValueWithPrev( - roachpb.Key("f"), - makeValWithTs("valF3", 7), - makeVal("valF2"), - ), - rangeFeedValue( - roachpb.Key("h"), - makeValWithTs("valH1", 15), - ), - rangeFeedValue( - roachpb.Key("s"), - makeValWithTs("valS1", 19), - ), - } - if !withFiltering { - expEvents = append(expEvents, - rangeFeedValueWithPrev( - roachpb.Key("s"), - makeValWithTs("valS2", 20), - makeVal("valS1"), - )) - } - expEvents = append(expEvents, rangeFeedValueWithPrev( - roachpb.Key("s"), - makeValWithTs("valS3", 21), - // Even though the event that wrote val2 is filtered out, we want to keep - // val2 as a previous value of the next event. - makeVal("valS2"), - )) - require.Equal(t, expEvents, r.Events()) + require.Equal(t, expEvents(withFiltering), r.Events()) }) }