diff --git a/go/test/endtoend/vreplication/global_routing_test.go b/go/test/endtoend/vreplication/global_routing_test.go new file mode 100644 index 00000000000..25339d46f1e --- /dev/null +++ b/go/test/endtoend/vreplication/global_routing_test.go @@ -0,0 +1,288 @@ +/* +Copyright 2024 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package vreplication + +import ( + "fmt" + "strings" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "vitess.io/vitess/go/vt/log" + vttablet "vitess.io/vitess/go/vt/vttablet/common" +) + +/* +* Create unsharded keyspace with two tables, t1,t2,t3, empty vschema. Confirm global routing works. Also try @primary, @replica +* Add another unsharded keyspace with t2,t4,t5. Check what happens +* Add MoveTables into sharded keyspace moving t2, t4 . Check what happens on Create/SwitchRead/SwitchWrites/Complete +* Check global routing for each with an expectation. +* First BEFORE and then AFTEr the logic change + */ + +func getSchema(tables []string) string { + var createSQL string + for _, table := range tables { + createSQL += "CREATE TABLE " + table + " (id int primary key, val varchar(32)) ENGINE=InnoDB;\n" + } + return createSQL +} + +func insertData(t *testing.T, keyspace string, table string, id int, val string) { + vtgateConn, cancel := getVTGateConn() + defer cancel() + _, err := vtgateConn.ExecuteFetch(fmt.Sprintf("insert into %s.%s(id, val) values(%d, '%s')", keyspace, table, id, val), 1, false) + require.NoError(t, err) +} + +var ksS1VSchema = ` +{ + "sharded": true, + "vindexes": { + "reverse_bits": { + "type": "reverse_bits" + } + }, + "tables": { + "t2": { + "column_vindexes": [ + { + "column": "id", + "name": "reverse_bits" + } + ] + }, + "t4": { + "column_vindexes": [ + { + "column": "id", + "name": "reverse_bits" + } + ] + } + } +} +` + +func isGlobal(t *testing.T, tables []string, expectedVal string) bool { + vtgateConn, cancel := getVTGateConn() + defer cancel() + var err error + asExpected := true + for _, table := range tables { + for _, target := range []string{"", "@primary", "@replica"} { + _, err = vtgateConn.ExecuteFetch(fmt.Sprintf("use %s", target), 1, false) + require.NoError(t, err) + rs, err := vtgateConn.ExecuteFetch(fmt.Sprintf("select * from %s", table), 1, false) + require.NoError(t, err) + gotVal := rs.Rows[0][1].ToString() + if gotVal != expectedVal { + asExpected = false + } + } + } + return asExpected +} + +func isNotGlobal(t *testing.T, tables []string) bool { + vtgateConn, cancel := getVTGateConn() + defer cancel() + var err error + asExpected := true + for _, table := range tables { + for _, target := range []string{"", "@primary", "@replica"} { + _, err = vtgateConn.ExecuteFetch(fmt.Sprintf("use %s", target), 1, false) + require.NoError(t, err) + _, err := vtgateConn.ExecuteFetch(fmt.Sprintf("select * from %s", table), 1, false) + log.Infof("Got error %v, for table %s.%s", err, table, target) + if err == nil || !strings.Contains(err.Error(), fmt.Sprintf("table %s not found", table)) { + asExpected = false + } + } + } + return asExpected +} + +func isAmbiguous(t *testing.T, tables []string) bool { + vtgateConn, cancel := getVTGateConn() + defer cancel() + var err error + asExpected := true + for _, table := range tables { + for _, target := range []string{"", "@primary", "@replica"} { + _, err = vtgateConn.ExecuteFetch(fmt.Sprintf("use %s", target), 1, false) + require.NoError(t, err) + _, err := vtgateConn.ExecuteFetch(fmt.Sprintf("select * from %s", table), 1, false) + if err == nil || !strings.Contains(err.Error(), "ambiguous") { + asExpected = false + } + } + } + return asExpected +} + +type tGlobalRoutingTestConfig struct { + ksU1, ksU2, ksS1 string + ksU1Tables, ksU2Tables, ksS1Tables []string +} + +var globalRoutingTestConfig tGlobalRoutingTestConfig = tGlobalRoutingTestConfig{ + ksU1: "unsharded1", + ksU2: "unsharded2", + ksS1: "sharded1", + ksU1Tables: []string{"t1", "t2", "t3"}, + ksU2Tables: []string{"t2", "t4", "t5"}, + ksS1Tables: []string{"t2", "t4"}, +} + +type tGlobalRoutingTestExpectationFuncs struct { + postKsU1, postKsU2, postKsS1 func(t *testing.T) +} + +type globalRoutingTestCase struct { + markAsGlobal bool + unshardedHaveVSchema bool +} + +func setExpectations(t *testing.T) *map[globalRoutingTestCase]*tGlobalRoutingTestExpectationFuncs { + var exp = make(map[globalRoutingTestCase]*tGlobalRoutingTestExpectationFuncs) + exp[globalRoutingTestCase{unshardedHaveVSchema: false, markAsGlobal: false}] = &tGlobalRoutingTestExpectationFuncs{ + postKsU1: func(t *testing.T) { + require.True(t, isGlobal(t, []string{"t1", "t2", "t3"}, globalRoutingTestConfig.ksU1)) + }, + postKsU2: func(t *testing.T) { + require.True(t, isNotGlobal(t, []string{"t1", "t2", "t3"})) + require.True(t, isNotGlobal(t, []string{"t4", "t5"})) + }, + postKsS1: func(t *testing.T) { + require.True(t, isGlobal(t, []string{"t2", "t4"}, globalRoutingTestConfig.ksS1)) + require.True(t, isNotGlobal(t, []string{"t1", "t3"})) + require.True(t, isNotGlobal(t, []string{"t5"})) + }, + } + exp[globalRoutingTestCase{unshardedHaveVSchema: false, markAsGlobal: true}] = &tGlobalRoutingTestExpectationFuncs{ + postKsU1: func(t *testing.T) { + require.True(t, isGlobal(t, []string{"t1", "t2", "t3"}, globalRoutingTestConfig.ksU1)) + }, + postKsU2: func(t *testing.T) { + require.True(t, isGlobal(t, []string{"t1", "t3"}, globalRoutingTestConfig.ksU1)) + require.True(t, isGlobal(t, []string{"t4", "t5"}, globalRoutingTestConfig.ksU2)) + require.True(t, isAmbiguous(t, []string{"t2"})) + }, + postKsS1: func(t *testing.T) { + require.True(t, isGlobal(t, []string{"t2", "t4"}, globalRoutingTestConfig.ksS1)) + require.True(t, isGlobal(t, []string{"t1", "t3"}, globalRoutingTestConfig.ksU1)) + require.True(t, isGlobal(t, []string{"t5"}, globalRoutingTestConfig.ksU2)) + }, + } + exp[globalRoutingTestCase{unshardedHaveVSchema: true, markAsGlobal: false}] = &tGlobalRoutingTestExpectationFuncs{ + postKsU1: func(t *testing.T) { + require.True(t, isGlobal(t, []string{"t1", "t2", "t3"}, globalRoutingTestConfig.ksU1)) + }, + postKsU2: func(t *testing.T) { + require.True(t, isGlobal(t, []string{"t1", "t3"}, globalRoutingTestConfig.ksU1)) + require.True(t, isGlobal(t, []string{"t4", "t5"}, globalRoutingTestConfig.ksU2)) + require.True(t, isAmbiguous(t, []string{"t2"})) + }, + postKsS1: func(t *testing.T) { + require.True(t, isAmbiguous(t, []string{"t2", "t4"})) + require.True(t, isGlobal(t, []string{"t1", "t3"}, globalRoutingTestConfig.ksU1)) + require.True(t, isGlobal(t, []string{"t5"}, globalRoutingTestConfig.ksU2)) + }, + } + exp[globalRoutingTestCase{unshardedHaveVSchema: true, markAsGlobal: true}] = + exp[globalRoutingTestCase{unshardedHaveVSchema: true, markAsGlobal: false}] + return &exp + +} + +func TestGlobalRouting(t *testing.T) { + exp := *setExpectations(t) + testCases := []globalRoutingTestCase{ + {unshardedHaveVSchema: false, markAsGlobal: true}, + {unshardedHaveVSchema: false, markAsGlobal: false}, + {unshardedHaveVSchema: true, markAsGlobal: true}, + {unshardedHaveVSchema: true, markAsGlobal: false}, + } + for _, tc := range testCases { + funcs := exp[tc] + require.NotNil(t, funcs) + testGlobalRouting(t, tc.markAsGlobal, tc.unshardedHaveVSchema, funcs) + } +} + +func getUnshardedVschema(unshardedHaveVSchema bool, tables []string) string { + if !unshardedHaveVSchema { + return "" + } + vschema := `{"tables": {` + for i, table := range tables { + if i != 0 { + vschema += `,` + } + vschema += fmt.Sprintf(`"%s": {}`, table) + } + vschema += `}}` + return vschema +} + +func testGlobalRouting(t *testing.T, markAsGlobal, unshardedHaveVSchema bool, funcs *tGlobalRoutingTestExpectationFuncs) { + setSidecarDBName("_vt") + vttablet.InitVReplicationConfigDefaults() + extraVTGateArgs = append(extraVTGateArgs, fmt.Sprintf("--mark_unique_unsharded_tables_as_global=%t", markAsGlobal)) + + vc = NewVitessCluster(t, nil) + defer vc.TearDown() + zone1 := vc.Cells["zone1"] + config := globalRoutingTestConfig + vc.AddKeyspace(t, []*Cell{zone1}, config.ksU1, "0", getUnshardedVschema(unshardedHaveVSchema, config.ksU1Tables), + getSchema(config.ksU1Tables), 1, 0, 100, nil) + verifyClusterHealth(t, vc) + for _, table := range config.ksU1Tables { + insertData(t, config.ksU1, table, 1, config.ksU1) + } + time.Sleep(5 * time.Second) + funcs.postKsU1(t) + + vc.AddKeyspace(t, []*Cell{zone1}, config.ksU2, "0", getUnshardedVschema(unshardedHaveVSchema, config.ksU2Tables), + getSchema(config.ksU2Tables), 1, 0, 200, nil) + verifyClusterHealth(t, vc) + for _, table := range config.ksU2Tables { + insertData(t, config.ksU2, table, 1, config.ksU2) + } + time.Sleep(5 * time.Second) // FIXME: wait for the mysql replication to catch up on the replica + rebuild(t) + funcs.postKsU2(t) + + vc.AddKeyspace(t, []*Cell{zone1}, config.ksS1, "-80,80-", ksS1VSchema, getSchema(config.ksS1Tables), 1, 0, 300, nil) + verifyClusterHealth(t, vc) + for _, table := range config.ksS1Tables { + insertData(t, config.ksS1, table, 1, config.ksS1) + } + time.Sleep(5 * time.Second) + rebuild(t) + funcs.postKsS1(t) +} + +func rebuild(t *testing.T) { + err := vc.VtctldClient.ExecuteCommand("RebuildVSchemaGraph") + require.NoError(t, err) + err = vc.VtctldClient.ExecuteCommand("RebuildKeyspaceGraph", globalRoutingTestConfig.ksU1, globalRoutingTestConfig.ksU2) + require.NoError(t, err) +} diff --git a/go/test/endtoend/vreplication/r b/go/test/endtoend/vreplication/r new file mode 100755 index 00000000000..aab2f7ea320 --- /dev/null +++ b/go/test/endtoend/vreplication/r @@ -0,0 +1,50 @@ +cleanup() { + rm -rf /Users/rohit/vtdataroot/* + killall vtctldclient vtctld vttablet vtgate vtorc mysqlctl etcd + ps | grep /vtdataroot | awk '{print $1}' | xargs kill -9 + ps x | grep mysql | grep -v grep | awk '{print $1}' | xargs kill -9 + + + rm -rf ~/vtdataroot/* + mkdir -p ~/vtdataroot + mkdir -p ~/vtdataroot/tmp + mkdir -p ~/vtdataroot/ext + mkdir -p ~/vtdataroot/ext/tmp +} + +declare -a tests=("TestMaterializeVtctld") +declare -a tests=("TestMaterializeView") +declare -a tests=("TestMultiTenantSimple") +declare -a tests=("TestReferenceTableMaterialize") +declare -a tests=("WorkflowDuplicateKeyBackoff") +declare -a tests=("BasicVreplicationWorkflow") +declare -a tests=("CellAlias") +declare -a tests=("TestVSchemaChangesUnderLoad") +declare -a tests=("TestMoveTablesBuffering") +declare -a tests=("MigrateSharded") +declare -a tests=("CopyParallel") +declare -a tests=("TestWorkflowDuplicateKeyBackoff2") +declare -a tests=("TestMoveTablesBasic") +declare -a tests=("TestVtctldclientCLI") + +declare -a tests=("TestBasicVreplicationWorkflow") +declare -a tests=("TestLookupVindex") +declare -a tests=("TestGlobalRouting") + + +export VREPLICATION_E2E_DEBUG= +export CI=true +for test in ${tests[@]}; do + clear + echo "================ Starting $test ==============" + echo + cleanup + go test -timeout 20m -failfast -v --alsologtostderr -run $test + RET=$? + echo "================ Done $test ================" + echo + say "$test done" + exit $RET +done + + diff --git a/go/test/endtoend/vreplication/r2 b/go/test/endtoend/vreplication/r2 new file mode 100755 index 00000000000..a69dc5eaa6f --- /dev/null +++ b/go/test/endtoend/vreplication/r2 @@ -0,0 +1,17 @@ +cleanup() { + rm -rf /Users/rohit/vtdataroot/* + rm -f queries.txt + killall vtctldclient vtctld vttablet vtgate vtorc mysqlctl etcd + ps | grep /vtdataroot | awk '{print $1}' | xargs kill -9 + + rm -rf ~/vtdataroot/* + mkdir -p ~/vtdataroot + mkdir -p ~/vtdataroot/tmp + mkdir -p ~/vtdataroot/ext + mkdir -p ~/vtdataroot/ext/tmp +} +cleanup +cd ~/vitess +make +cd ~/vitess/go/test/endtoend/vreplication +./r diff --git a/go/vt/vtgate/vindexes/global_routing.md b/go/vt/vtgate/vindexes/global_routing.md new file mode 100644 index 00000000000..d6be38c5f16 --- /dev/null +++ b/go/vt/vtgate/vindexes/global_routing.md @@ -0,0 +1,102 @@ +# RFC: Enhancements to Global Routing for Unsharded Tables + +## Overview + +This RFC proposes enhancements to the global routing mechanism in Vitess. The goal is to ensure +that unique tables from keyspaces without defined vschemas are globally routable. This document discusses the +current global +routing features, the proposed changes, and provides examples to illustrate the impact of these changes. + +## Motivation + +Vitess has two ways of addressing tables: using qualified names where the keyspace is specified or using unqualified +table names. Example: `keyspace1.table1` vs. `table1`. Tables are currently only globally routable if + +* there is only one keyspace in the cluster, which is unsharded, or +* if there are multiple keyspaces and the unique tables are defined in the `vschema` for all keyspaces from which you + want tables to be globally routable. + +This has a catastrophic consequences of this logic. One example: + +* User has a single unsharded keyspace `unsharded1` and are using unqualified table names, because their app had been + written + using unqualified names while targeting a regular MySQL db. `vtgate` correctly routes this because there is only + one unsharded keyspace: there is no vschema to consult because it was not necessary at this point to create one + for the unsharded keyspace. +* User wants to reshard some large tables. Say, A `MoveTables` workflow is started into a sharded + keyspace, +say, + `sharded1`, which, obviously, has a vschema with all tables defined. Say `table2` is moved in this workflow but + `table1` continues to live in `unsharded1` + So tables with the same name `table2` exist in both the user's db as well as in `sharded1`. Routing rules have + already been setup so that the unqualified tables are routed to `unsharded1`. `sharded1` is still not-serving, so + the tables in `sharded1` are "invisible" to `vtgate`. +* When the `MoveTables`has caught up and the user does a `SwitchWrites` (i.e.`SwitchTraffic` for primary) `sharded1` + is now serving, *but* routing rules are updated to point to `sharded`, so the global routing _just_ works +* The problem comes when user does a `Complete` on the workflow to clean things up. + +A similar problem also holds if the user started with just one unsharded keyspace in Vitess and uses MoveTables to move +some of the tables into a sharded keyspace. + +## Current Global Routing Features + +### Global Routable Tables + +In Vitess, a table is considered globally routable if it meets the following criteria: + +- The table exists in a single unsharded keyspace. +- The table exists in multiple unsharded keyspaces but is identical in schema and vindex configuration. + +### Ambiguous Tables + +A table is considered ambiguous if: + +- The table exists in multiple unsharded keyspaces with different schemas or vindex configurations. +- The table exists in both sharded and unsharded keyspaces. + +### Example + +Consider the following keyspaces and tables: + +- `keyspace1` (unsharded): `table1`, `table2` +- `keyspace2` (unsharded): `table2`, `table3` +- `keyspace3` (sharded): `table4` + +In this scenario: + +- `table1` is globally routable. +- `table2` is ambiguous because it exists in multiple unsharded keyspaces with potentially different configurations. +- `table3` is globally routable. +- `table4` is globally routable because it exists in a sharded keyspace for which vschema is defined. + +## Proposed Changes + +The proposed changes aim to make tables from keyspaces without defined vschemas globally routable. Specifically, the +changes include: + +1. **Automatic Inclusion of Tables from Keyspaces without Vschemas**: Tables from keyspaces that do not have vschemas + defined will be automatically included in the global routing table. +2. **Conflict Resolution**: In case of conflicts (i.e., tables with the same name in multiple keyspaces), the table will + be marked as ambiguous. + +### Example + +Consider the following keyspaces and tables after the proposed changes: + +- `keyspace1` (unsharded, no vschema): `table1`, `table2` +- `keyspace2` (unsharded, no vschema): `table2`, `table3` +- `keyspace3` (sharded): `table4` + +In this scenario: + +- `table1` is globally routable. +- `table2` is ambiguous because it exists in multiple unsharded keyspaces. +- `table3` is globally routable. +- `table4` is not globally routable because it exists in a sharded keyspace. + +## Benefits + +- **Improved Global Routing**: Ensures that tables from keyspaces without vschemas are included in the global routing + table, preventing hard-down situations as detailed above. +- **Simplified Configuration**: Reduces the need for explicit vschema definitions for unsharded keyspaces, simplifying + the configuration process. diff --git a/go/vt/vtgate/vindexes/vschema.go b/go/vt/vtgate/vindexes/vschema.go index 1847dd0539e..be5e7884ee4 100644 --- a/go/vt/vtgate/vindexes/vschema.go +++ b/go/vt/vtgate/vindexes/vschema.go @@ -25,6 +25,8 @@ import ( "strings" "time" + "vitess.io/vitess/go/vt/log" + "vitess.io/vitess/go/ptr" "vitess.io/vitess/go/json2" @@ -1354,6 +1356,7 @@ func (vschema *VSchema) findGlobalTable( } if ok { + log.Infof(">>>>>>>> ambiguous %s, global tables are %+v", tablename, vschema.globalTables) return nil, vterrors.Errorf( vtrpcpb.Code_FAILED_PRECONDITION, "ambiguous table reference: %s", diff --git a/go/vt/vtgate/vschema_manager.go b/go/vt/vtgate/vschema_manager.go index 290971c45ed..9f408cf56e2 100644 --- a/go/vt/vtgate/vschema_manager.go +++ b/go/vt/vtgate/vschema_manager.go @@ -197,7 +197,10 @@ func (vm *VSchemaManager) buildAndEnhanceVSchema(v *vschemapb.SrvVSchema) *vinde // Add tables from schema tracking into globally routable tables, if they are not already present. // We need to skip if already present, to handle the case where MoveTables has switched traffic // and removed the source vschema but not from the source database because user asked to --keep-data - vindexes.AddAdditionalGlobalTables(v, vschema) + if MarkUniqueUnshardedTablesAsGlobal { + log.Infof(">>>>>>>> Marking unique unsharded tables as global") + vindexes.AddAdditionalGlobalTables(v, vschema) + } } return vschema } diff --git a/go/vt/vtgate/vtgate.go b/go/vt/vtgate/vtgate.go index 8bab05479dd..200db05b9d6 100644 --- a/go/vt/vtgate/vtgate.go +++ b/go/vt/vtgate/vtgate.go @@ -135,6 +135,8 @@ var ( warmingReadsPercent = 0 warmingReadsQueryTimeout = 5 * time.Second warmingReadsConcurrency = 500 + + MarkUniqueUnshardedTablesAsGlobal = true ) func registerFlags(fs *pflag.FlagSet) { @@ -173,6 +175,7 @@ func registerFlags(fs *pflag.FlagSet) { fs.IntVar(&warmingReadsConcurrency, "warming-reads-concurrency", 500, "Number of concurrent warming reads allowed") fs.DurationVar(&warmingReadsQueryTimeout, "warming-reads-query-timeout", 5*time.Second, "Timeout of warming read queries") + fs.BoolVar(&MarkUniqueUnshardedTablesAsGlobal, "mark_unique_unsharded_tables_as_global", MarkUniqueUnshardedTablesAsGlobal, "Mark unique unsharded tables as global tables") viperutil.BindFlags(fs, enableOnlineDDL, enableDirectDDL) }