diff --git a/pkg/ccl/testccl/sqlccl/BUILD.bazel b/pkg/ccl/testccl/sqlccl/BUILD.bazel index edbfcab19cf9..bb12483f8086 100644 --- a/pkg/ccl/testccl/sqlccl/BUILD.bazel +++ b/pkg/ccl/testccl/sqlccl/BUILD.bazel @@ -11,6 +11,7 @@ go_test( "session_revival_test.go", "show_create_test.go", "show_transfer_state_test.go", + "standby_read_test.go", "temp_table_clean_test.go", "tenant_gc_test.go", ], @@ -39,6 +40,8 @@ go_test( "//pkg/settings/cluster", "//pkg/spanconfig", "//pkg/sql", + "//pkg/sql/catalog/lease", + "//pkg/sql/catalog/replication", "//pkg/sql/gcjob", "//pkg/sql/isql", "//pkg/sql/lexbase", diff --git a/pkg/ccl/testccl/sqlccl/standby_read_test.go b/pkg/ccl/testccl/sqlccl/standby_read_test.go new file mode 100644 index 000000000000..943621cd6ce0 --- /dev/null +++ b/pkg/ccl/testccl/sqlccl/standby_read_test.go @@ -0,0 +1,110 @@ +// Copyright 2024 The Cockroach Authors. +// +// Use of this software is governed by the CockroachDB Software License +// included in the /LICENSE file. + +package sqlccl + +import ( + "context" + "testing" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/sql" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/lease" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/replication" + "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/errors" + "github.com/stretchr/testify/require" +) + +func TestStandbyRead(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + testcases := []struct { + standby bool + stmt string + expected [][]string + }{ + {stmt: `CREATE TABLE abc (a INT PRIMARY KEY, b INT, c JSONB)`}, + {stmt: `INSERT INTO abc VALUES (1, 10, '[100]'), (3, 30, '[300]'), (5, 50, '[500]')`}, + {stmt: `ALTER TABLE abc SPLIT AT VALUES (2), (4)`}, + {stmt: `SELECT count(*) FROM [SHOW TABLES]`, expected: [][]string{{"1"}}}, + {stmt: `SELECT count(*) FROM abc`, expected: [][]string{{"3"}}}, + {standby: true, stmt: `SELECT count(*) FROM [SHOW TABLES]`, expected: [][]string{{"1"}}}, + {standby: true, stmt: `SELECT count(*) FROM abc`, expected: [][]string{{"3"}}}, + } + + ctx := context.Background() + tc := serverutils.StartCluster(t, 3, /* numNodes */ + base.TestClusterArgs{ + ReplicationMode: base.ReplicationManual, + ServerArgs: base.TestServerArgs{ + DefaultTestTenant: base.TestControlsTenantsExplicitly, + }, + }) + defer tc.Stopper().Stop(ctx) + ts := tc.Server(0) + + _, srcDB, err := ts.TenantController().StartSharedProcessTenant(ctx, + base.TestSharedProcessTenantArgs{ + TenantID: serverutils.TestTenantID(), + TenantName: "src", + UseDatabase: "defaultdb", + }, + ) + require.NoError(t, err) + dstTenant, dstDB, err := ts.TenantController().StartSharedProcessTenant(ctx, + base.TestSharedProcessTenantArgs{ + TenantID: serverutils.TestTenantID2(), + TenantName: "dst", + UseDatabase: "defaultdb", + }, + ) + require.NoError(t, err) + + srcRunner := sqlutils.MakeSQLRunner(srcDB) + dstRunner := sqlutils.MakeSQLRunner(dstDB) + dstInternal := dstTenant.InternalDB().(*sql.InternalDB) + + dstRunner.Exec(t, `SET CLUSTER SETTING sql.defaults.distsql = always`) + dstRunner.Exec(t, `SET distsql = always`) + + waitForReplication := func() { + now := ts.Clock().Now() + err := replication.SetupOrAdvanceStandbyReaderCatalog( + ctx, serverutils.TestTenantID(), now, dstInternal, dstTenant.ClusterSettings(), + ) + if err != nil { + t.Fatal(err) + } + now = ts.Clock().Now() + lm := dstTenant.LeaseManager().(*lease.Manager) + testutils.SucceedsSoon(t, func() error { + if lm.GetSafeReplicationTS().Less(now) { + return errors.AssertionFailedf("waiting for descriptor close timestamp to catch up") + } + return nil + }) + } + + for _, tc := range testcases { + var runner *sqlutils.SQLRunner + if tc.standby { + waitForReplication() + runner = dstRunner + } else { + runner = srcRunner + } + if tc.expected == nil { + runner.Exec(t, tc.stmt) + } else { + runner.CheckQueryResultsRetry(t, tc.stmt, tc.expected) + } + } +} diff --git a/pkg/sql/row/kv_batch_fetcher.go b/pkg/sql/row/kv_batch_fetcher.go index 45298a1465a2..2532ea42c7c0 100644 --- a/pkg/sql/row/kv_batch_fetcher.go +++ b/pkg/sql/row/kv_batch_fetcher.go @@ -298,7 +298,6 @@ func makeExternalSpanSendFunc( ext *fetchpb.IndexFetchSpec_ExternalRowData, db *kv.DB, batchRequestsIssued *int64, ) sendFunc { return func(ctx context.Context, ba *kvpb.BatchRequest) (*kvpb.BatchResponse, error) { - ba.Timestamp = ext.AsOf for _, req := range ba.Requests { // We only allow external row data for a few known types of request. switch r := req.GetInner().(type) { @@ -310,12 +309,34 @@ func makeExternalSpanSendFunc( } } log.VEventf(ctx, 2, "kv external fetcher: sending a batch with %d requests", len(ba.Requests)) - res, err := db.NonTransactionalSender().Send(ctx, ba) + + // Open a new transaction with fixed timestamp set to the external + // timestamp. We must do this with txn.Send rather than using + // db.NonTransactionalSender to get the 1-to-1 request-response guarantee + // required by txnKVFetcher. + // TODO(michae2): Explore whether we should keep this transaction open for + // the duration of the surrounding transaction. + var res *kvpb.BatchResponse + err := db.TxnWithAdmissionControl( + ctx, ba.AdmissionHeader.Source, admissionpb.WorkPriority(ba.AdmissionHeader.Priority), + kv.SteppingDisabled, + func(ctx context.Context, txn *kv.Txn) error { + if err := txn.SetFixedTimestamp(ctx, ext.AsOf); err != nil { + return err + } + var err *kvpb.Error + res, err = txn.Send(ctx, ba) + if err != nil { + return err.GoError() + } + return nil + }) + // Note that in some code paths there is no concurrency when using the // sendFunc, but we choose to unconditionally use atomics here since its // overhead should be negligible in the grand scheme of things anyway. atomic.AddInt64(batchRequestsIssued, 1) - return res, err.GoError() + return res, err } }