diff --git a/.github/workflows/vitess_tester_vtgate.yml b/.github/workflows/vitess_tester_vtgate.yml
index 887d6a4d095..64b429a3724 100644
--- a/.github/workflows/vitess_tester_vtgate.yml
+++ b/.github/workflows/vitess_tester_vtgate.yml
@@ -112,7 +112,7 @@ jobs:
go install github.com/vitessio/go-junit-report@HEAD
# install vitess tester
- go install github.com/vitessio/vitess-tester@89dd933a9ea0e15f69ca58b9c8ea09a358762cca
+ go install github.com/vitessio/vitess-tester/go/vt@374fd9f495c1afd3b6bea9d4ec7728119714055
- name: Setup launchable dependencies
if: steps.skip-workflow.outputs.is_draft == 'false' && steps.skip-workflow.outputs.skip-workflow == 'false' && steps.changes.outputs.end_to_end == 'true' && github.base_ref == 'main'
@@ -144,9 +144,9 @@ jobs:
# We go over all the directories in the given path.
# If there is a vschema file there, we use it, otherwise we let vitess-tester autogenerate it.
if [ -f $dir/vschema.json ]; then
- vitess-tester --xunit --vschema "$dir"vschema.json $dir/*.test
+ vt tester --xunit --vschema "$dir"vschema.json $dir/*.test
else
- vitess-tester --sharded --xunit $dir/*.test
+ vt tester --sharded --xunit $dir/*.test
fi
# Number the reports by changing their file names.
mv report.xml report"$i".xml
diff --git a/changelog/21.0/21.0.0/summary.md b/changelog/21.0/21.0.0/summary.md
index bda5595dd70..4de89929039 100644
--- a/changelog/21.0/21.0.0/summary.md
+++ b/changelog/21.0/21.0.0/summary.md
@@ -18,6 +18,7 @@
- **[Dynamic VReplication Configuration](#dynamic-vreplication-configuration)**
- **[Reference Table Materialization](#reference-table-materialization)**
- **[New VEXPLAIN Modes: TRACE and KEYS](#new-vexplain-modes)**
+ - **[Errant GTID Detection on VTTablets](#errant-gtid-vttablet)**
## Major Changes
@@ -161,8 +162,8 @@ so please read the [documentation](https://vitess.io/docs/21.0/user-guides/opera
### Dynamic VReplication Configuration
-Previously, many of the configuration options for VReplication Workflows had to be provided using vttablet flags. This
-meant that any change to VReplication configuration required restarting vttablets. We now allow these to be overridden
+Previously, many of the configuration options for VReplication Workflows had to be provided using VTTablet flags. This
+meant that any change to VReplication configuration required restarting VTTablets. We now allow these to be overridden
while creating a workflow or dynamically after the workflow is already in progress.
### Reference Table Materialization
@@ -200,3 +201,11 @@ filter columns (potential candidates for indexes, primary keys, or sharding keys
These new `VEXPLAIN` modes enhance Vitess's query analysis capabilities, allowing for more informed decisions about sharding
strategies and query optimization.
+
+### Errant GTID Detection on VTTablets
+
+VTTablets now run an errant GTID detection logic before they join the replication stream. So, if a replica has an errant GTID, it will
+not start replicating from the primary. This protects us from running into situations which are very difficult to recover from.
+
+For users running with the vitess-operator on Kubernetes, this change means that replica tablets with errant GTIDs will have broken
+replication and will report as unready. Users will need to manually replace and clean up these errant replica tablets.
diff --git a/go/mysql/replication/mysql56_gtid_set.go b/go/mysql/replication/mysql56_gtid_set.go
index 348af5b5274..918a6ec3b6b 100644
--- a/go/mysql/replication/mysql56_gtid_set.go
+++ b/go/mysql/replication/mysql56_gtid_set.go
@@ -467,6 +467,33 @@ func (set Mysql56GTIDSet) SIDBlock() []byte {
return buf.Bytes()
}
+// ErrantGTIDsOnReplica gets the errant GTIDs on the replica by comparing against the primary position and UUID.
+func ErrantGTIDsOnReplica(replicaPosition Position, primaryPosition Position) (string, error) {
+ replicaGTIDSet, replicaOk := replicaPosition.GTIDSet.(Mysql56GTIDSet)
+ primaryGTIDSet, primaryOk := primaryPosition.GTIDSet.(Mysql56GTIDSet)
+
+ // Currently we only support errant GTID detection for MySQL 56 flavour.
+ if !replicaOk || !primaryOk {
+ return "", nil
+ }
+
+ // Calculate the difference between the replica and primary GTID sets.
+ diffSet := replicaGTIDSet.Difference(primaryGTIDSet)
+ return diffSet.String(), nil
+}
+
+// RemoveUUID removes a specific UUID from the gtid set.
+func (set Mysql56GTIDSet) RemoveUUID(uuid SID) Mysql56GTIDSet {
+ newSet := make(Mysql56GTIDSet)
+ for sid, intervals := range set {
+ if sid == uuid {
+ continue
+ }
+ newSet[sid] = intervals
+ }
+ return newSet
+}
+
// Difference will supply the difference between the receiver and supplied Mysql56GTIDSets, and supply the result
// as a Mysql56GTIDSet.
func (set Mysql56GTIDSet) Difference(other Mysql56GTIDSet) Mysql56GTIDSet {
diff --git a/go/mysql/replication/mysql56_gtid_set_test.go b/go/mysql/replication/mysql56_gtid_set_test.go
index 323baae3885..bff23679afb 100644
--- a/go/mysql/replication/mysql56_gtid_set_test.go
+++ b/go/mysql/replication/mysql56_gtid_set_test.go
@@ -704,3 +704,88 @@ func BenchmarkMySQL56GTIDParsing(b *testing.B) {
}
}
}
+
+func TestErrantGTIDsOnReplica(t *testing.T) {
+ tests := []struct {
+ name string
+ replicaPosition string
+ primaryPosition string
+ errantGtidWanted string
+ wantErr string
+ }{
+ {
+ name: "Empty replica position",
+ replicaPosition: "MySQL56/",
+ primaryPosition: "MySQL56/8bc65c84-3fe4-11ed-a912-257f0fcdd6c9:1-8",
+ errantGtidWanted: "",
+ }, {
+ name: "Empty primary position",
+ replicaPosition: "MySQL56/8bc65c84-3fe4-11ed-a912-257f0fcdd6c9:1-8",
+ primaryPosition: "MySQL56/",
+ errantGtidWanted: "8bc65c84-3fe4-11ed-a912-257f0fcdd6c9:1-8",
+ }, {
+ name: "Empty primary position - with multiple errant gtids",
+ replicaPosition: "MySQL56/8bc65c84-3fe4-11ed-a912-257f0fcdd6c9:1-8,8bc65cca-3fe4-11ed-bbfb-091034d48b3e:1",
+ primaryPosition: "MySQL56/",
+ errantGtidWanted: "8bc65c84-3fe4-11ed-a912-257f0fcdd6c9:1-8,8bc65cca-3fe4-11ed-bbfb-091034d48b3e:1",
+ }, {
+ name: "Single errant GTID",
+ replicaPosition: "MySQL56/8bc65c84-3fe4-11ed-a912-257f0fcdd6c9:1-8,8bc65cca-3fe4-11ed-bbfb-091034d48b3e:1,8bc65cca-3fe4-11ed-bbfb-091034d48bd3:34",
+ primaryPosition: "MySQL56/8bc65c84-3fe4-11ed-a912-257f0fcdd6c9:1-50,8bc65cca-3fe4-11ed-bbfb-091034d48b3e:1-30",
+ errantGtidWanted: "8bc65cca-3fe4-11ed-bbfb-091034d48bd3:34",
+ }, {
+ name: "Multiple errant GTID",
+ replicaPosition: "MySQL56/8bc65c84-3fe4-11ed-a912-257f0fcdd6c9:1-8,8bc65cca-3fe4-11ed-bbfb-091034d48b3e:1-32,8bc65cca-3fe4-11ed-bbfb-091034d48bd3:3-35",
+ primaryPosition: "MySQL56/8bc65c84-3fe4-11ed-a912-257f0fcdd6c9:1-50,8bc65cca-3fe4-11ed-bbfb-091034d48b3e:1-30,8bc65cca-3fe4-11ed-bbfb-091034d48bd3:34",
+ errantGtidWanted: "8bc65cca-3fe4-11ed-bbfb-091034d48b3e:31-32,8bc65cca-3fe4-11ed-bbfb-091034d48bd3:3-33:35",
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ replPos, err := DecodePosition(tt.replicaPosition)
+ require.NoError(t, err)
+ primaryPos, err := DecodePosition(tt.primaryPosition)
+ require.NoError(t, err)
+ errantGTIDs, err := ErrantGTIDsOnReplica(replPos, primaryPos)
+ if tt.wantErr != "" {
+ require.ErrorContains(t, err, tt.wantErr)
+ } else {
+ require.NoError(t, err)
+ require.EqualValues(t, tt.errantGtidWanted, errantGTIDs)
+ }
+
+ })
+ }
+}
+
+func TestMysql56GTIDSet_RemoveUUID(t *testing.T) {
+ tests := []struct {
+ name string
+ initialSet string
+ uuid string
+ wantSet string
+ }{
+ {
+ name: "Remove unknown UUID",
+ initialSet: "8bc65c84-3fe4-11ed-a912-257f0fcdd6c9:1-8,8bc65cca-3fe4-11ed-bbfb-091034d48b3e:1:4-24",
+ uuid: "8bc65c84-3fe4-11ed-a912-257f0fcde6c9",
+ wantSet: "8bc65c84-3fe4-11ed-a912-257f0fcdd6c9:1-8,8bc65cca-3fe4-11ed-bbfb-091034d48b3e:1:4-24",
+ },
+ {
+ name: "Remove a single UUID",
+ initialSet: "8bc65c84-3fe4-11ed-a912-257f0fcdd6c9:1-8,8bc65cca-3fe4-11ed-bbfb-091034d48b3e:1:4-24",
+ uuid: "8bc65c84-3fe4-11ed-a912-257f0fcdd6c9",
+ wantSet: "8bc65cca-3fe4-11ed-bbfb-091034d48b3e:1:4-24",
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ gtidSet, err := ParseMysql56GTIDSet(tt.initialSet)
+ require.NoError(t, err)
+ sid, err := ParseSID(tt.uuid)
+ require.NoError(t, err)
+ gtidSet = gtidSet.RemoveUUID(sid)
+ require.EqualValues(t, tt.wantSet, gtidSet.String())
+ })
+ }
+}
diff --git a/go/test/endtoend/cluster/cluster_process.go b/go/test/endtoend/cluster/cluster_process.go
index 95995903a83..bf810a5a319 100644
--- a/go/test/endtoend/cluster/cluster_process.go
+++ b/go/test/endtoend/cluster/cluster_process.go
@@ -378,103 +378,9 @@ func (cluster *LocalProcessCluster) startKeyspace(keyspace Keyspace, shardNames
// Create the keyspace if it doesn't already exist.
_ = cluster.VtctlProcess.CreateKeyspace(keyspace.Name, keyspace.SidecarDBName, keyspace.DurabilityPolicy)
for _, shardName := range shardNames {
- shard := &Shard{
- Name: shardName,
- }
- log.Infof("Starting shard: %v", shardName)
- var mysqlctlProcessList []*exec.Cmd
- for i := 0; i < totalTabletsRequired; i++ {
- // instantiate vttablet object with reserved ports
- tabletUID := cluster.GetAndReserveTabletUID()
- tablet := &Vttablet{
- TabletUID: tabletUID,
- Type: "replica",
- HTTPPort: cluster.GetAndReservePort(),
- GrpcPort: cluster.GetAndReservePort(),
- MySQLPort: cluster.GetAndReservePort(),
- Alias: fmt.Sprintf("%s-%010d", cluster.Cell, tabletUID),
- }
- if i == 0 { // Make the first one as primary
- tablet.Type = "primary"
- } else if i == totalTabletsRequired-1 && rdonly { // Make the last one as rdonly if rdonly flag is passed
- tablet.Type = "rdonly"
- }
- // Start Mysqlctl process
- log.Infof("Starting mysqlctl for table uid %d, mysql port %d", tablet.TabletUID, tablet.MySQLPort)
- mysqlctlProcess, err := MysqlCtlProcessInstanceOptionalInit(tablet.TabletUID, tablet.MySQLPort, cluster.TmpDirectory, !cluster.ReusingVTDATAROOT)
- if err != nil {
- return err
- }
- switch tablet.Type {
- case "primary":
- mysqlctlProcess.Binary += os.Getenv("PRIMARY_TABLET_BINARY_SUFFIX")
- case "replica":
- mysqlctlProcess.Binary += os.Getenv("REPLICA_TABLET_BINARY_SUFFIX")
- }
- tablet.MysqlctlProcess = *mysqlctlProcess
- proc, err := tablet.MysqlctlProcess.StartProcess()
- if err != nil {
- log.Errorf("error starting mysqlctl process: %v, %v", tablet.MysqlctldProcess, err)
- return err
- }
- mysqlctlProcessList = append(mysqlctlProcessList, proc)
-
- // start vttablet process
- tablet.VttabletProcess = VttabletProcessInstance(
- tablet.HTTPPort,
- tablet.GrpcPort,
- tablet.TabletUID,
- cluster.Cell,
- shardName,
- keyspace.Name,
- cluster.VtctldProcess.Port,
- tablet.Type,
- cluster.TopoProcess.Port,
- cluster.Hostname,
- cluster.TmpDirectory,
- cluster.VtTabletExtraArgs,
- cluster.DefaultCharset)
- switch tablet.Type {
- case "primary":
- tablet.VttabletProcess.Binary += os.Getenv("PRIMARY_TABLET_BINARY_SUFFIX")
- case "replica":
- tablet.VttabletProcess.Binary += os.Getenv("REPLICA_TABLET_BINARY_SUFFIX")
- }
- tablet.Alias = tablet.VttabletProcess.TabletPath
- if cluster.ReusingVTDATAROOT {
- tablet.VttabletProcess.ServingStatus = "SERVING"
- }
- shard.Vttablets = append(shard.Vttablets, tablet)
- // Apply customizations
- for _, customizer := range customizers {
- if f, ok := customizer.(func(*VttabletProcess)); ok {
- f(tablet.VttabletProcess)
- } else {
- return fmt.Errorf("type mismatch on customizer: %T", customizer)
- }
- }
- }
-
- // wait till all mysqlctl is instantiated
- for _, proc := range mysqlctlProcessList {
- if err = proc.Wait(); err != nil {
- log.Errorf("unable to start mysql process %v: %v", proc, err)
- return err
- }
- }
- for _, tablet := range shard.Vttablets {
- log.Infof("Starting vttablet for tablet uid %d, grpc port %d", tablet.TabletUID, tablet.GrpcPort)
-
- if err = tablet.VttabletProcess.Setup(); err != nil {
- log.Errorf("error starting vttablet for tablet uid %d, grpc port %d: %v", tablet.TabletUID, tablet.GrpcPort, err)
- return
- }
- }
-
- // Make first tablet as primary
- if err = cluster.VtctldClientProcess.InitializeShard(keyspace.Name, shardName, cluster.Cell, shard.Vttablets[0].TabletUID); err != nil {
- log.Errorf("error running InitializeShard on keyspace %v, shard %v: %v", keyspace.Name, shardName, err)
- return
+ shard, err := cluster.AddShard(keyspace.Name, shardName, totalTabletsRequired, rdonly, customizers)
+ if err != nil {
+ return err
}
keyspace.Shards = append(keyspace.Shards, *shard)
}
@@ -488,33 +394,135 @@ func (cluster *LocalProcessCluster) startKeyspace(keyspace Keyspace, shardNames
}
if !existingKeyspace {
cluster.Keyspaces = append(cluster.Keyspaces, keyspace)
- }
- // Apply Schema SQL
- if keyspace.SchemaSQL != "" {
- if err = cluster.VtctldClientProcess.ApplySchema(keyspace.Name, keyspace.SchemaSQL); err != nil {
- log.Errorf("error applying schema: %v, %v", keyspace.SchemaSQL, err)
- return
+ // Apply Schema SQL
+ if keyspace.SchemaSQL != "" {
+ if err = cluster.VtctldClientProcess.ApplySchema(keyspace.Name, keyspace.SchemaSQL); err != nil {
+ log.Errorf("error applying schema: %v, %v", keyspace.SchemaSQL, err)
+ return
+ }
+ }
+
+ // Apply VSchema
+ if keyspace.VSchema != "" {
+ if err = cluster.VtctldClientProcess.ApplyVSchema(keyspace.Name, keyspace.VSchema); err != nil {
+ log.Errorf("error applying vschema: %v, %v", keyspace.VSchema, err)
+ return
+ }
+ }
+
+ log.Infof("Done creating keyspace: %v ", keyspace.Name)
+
+ err = cluster.StartVTOrc(keyspace.Name)
+ if err != nil {
+ log.Errorf("Error starting VTOrc - %v", err)
+ return err
}
}
- // Apply VSchema
- if keyspace.VSchema != "" {
- if err = cluster.VtctldClientProcess.ApplyVSchema(keyspace.Name, keyspace.VSchema); err != nil {
- log.Errorf("error applying vschema: %v, %v", keyspace.VSchema, err)
- return
+ return
+}
+
+func (cluster *LocalProcessCluster) AddShard(keyspaceName string, shardName string, totalTabletsRequired int, rdonly bool, customizers []any) (*Shard, error) {
+ shard := &Shard{
+ Name: shardName,
+ }
+ log.Infof("Starting shard: %v", shardName)
+ var mysqlctlProcessList []*exec.Cmd
+ for i := 0; i < totalTabletsRequired; i++ {
+ // instantiate vttablet object with reserved ports
+ tabletUID := cluster.GetAndReserveTabletUID()
+ tablet := &Vttablet{
+ TabletUID: tabletUID,
+ Type: "replica",
+ HTTPPort: cluster.GetAndReservePort(),
+ GrpcPort: cluster.GetAndReservePort(),
+ MySQLPort: cluster.GetAndReservePort(),
+ Alias: fmt.Sprintf("%s-%010d", cluster.Cell, tabletUID),
+ }
+ if i == 0 { // Make the first one as primary
+ tablet.Type = "primary"
+ } else if i == totalTabletsRequired-1 && rdonly { // Make the last one as rdonly if rdonly flag is passed
+ tablet.Type = "rdonly"
+ }
+ // Start Mysqlctl process
+ log.Infof("Starting mysqlctl for table uid %d, mysql port %d", tablet.TabletUID, tablet.MySQLPort)
+ mysqlctlProcess, err := MysqlCtlProcessInstanceOptionalInit(tablet.TabletUID, tablet.MySQLPort, cluster.TmpDirectory, !cluster.ReusingVTDATAROOT)
+ if err != nil {
+ return nil, err
+ }
+ switch tablet.Type {
+ case "primary":
+ mysqlctlProcess.Binary += os.Getenv("PRIMARY_TABLET_BINARY_SUFFIX")
+ case "replica":
+ mysqlctlProcess.Binary += os.Getenv("REPLICA_TABLET_BINARY_SUFFIX")
+ }
+ tablet.MysqlctlProcess = *mysqlctlProcess
+ proc, err := tablet.MysqlctlProcess.StartProcess()
+ if err != nil {
+ log.Errorf("error starting mysqlctl process: %v, %v", tablet.MysqlctldProcess, err)
+ return nil, err
+ }
+ mysqlctlProcessList = append(mysqlctlProcessList, proc)
+
+ // start vttablet process
+ tablet.VttabletProcess = VttabletProcessInstance(
+ tablet.HTTPPort,
+ tablet.GrpcPort,
+ tablet.TabletUID,
+ cluster.Cell,
+ shardName,
+ keyspaceName,
+ cluster.VtctldProcess.Port,
+ tablet.Type,
+ cluster.TopoProcess.Port,
+ cluster.Hostname,
+ cluster.TmpDirectory,
+ cluster.VtTabletExtraArgs,
+ cluster.DefaultCharset)
+ switch tablet.Type {
+ case "primary":
+ tablet.VttabletProcess.Binary += os.Getenv("PRIMARY_TABLET_BINARY_SUFFIX")
+ case "replica":
+ tablet.VttabletProcess.Binary += os.Getenv("REPLICA_TABLET_BINARY_SUFFIX")
+ }
+ tablet.Alias = tablet.VttabletProcess.TabletPath
+ if cluster.ReusingVTDATAROOT {
+ tablet.VttabletProcess.ServingStatus = "SERVING"
+ }
+ shard.Vttablets = append(shard.Vttablets, tablet)
+ // Apply customizations
+ for _, customizer := range customizers {
+ if f, ok := customizer.(func(*VttabletProcess)); ok {
+ f(tablet.VttabletProcess)
+ } else {
+ return nil, fmt.Errorf("type mismatch on customizer: %T", customizer)
+ }
}
}
- log.Infof("Done creating keyspace: %v ", keyspace.Name)
+ // wait till all mysqlctl is instantiated
+ for _, proc := range mysqlctlProcessList {
+ if err := proc.Wait(); err != nil {
+ log.Errorf("unable to start mysql process %v: %v", proc, err)
+ return nil, err
+ }
+ }
+ for _, tablet := range shard.Vttablets {
+ log.Infof("Starting vttablet for tablet uid %d, grpc port %d", tablet.TabletUID, tablet.GrpcPort)
- err = cluster.StartVTOrc(keyspace.Name)
- if err != nil {
- log.Errorf("Error starting VTOrc - %v", err)
- return err
+ if err := tablet.VttabletProcess.Setup(); err != nil {
+ log.Errorf("error starting vttablet for tablet uid %d, grpc port %d: %v", tablet.TabletUID, tablet.GrpcPort, err)
+ return nil, err
+ }
}
- return
+ // Make first tablet as primary
+ if err := cluster.VtctldClientProcess.InitializeShard(keyspaceName, shardName, cluster.Cell, shard.Vttablets[0].TabletUID); err != nil {
+ log.Errorf("error running InitializeShard on keyspace %v, shard %v: %v", keyspaceName, shardName, err)
+ return nil, err
+ }
+ return shard, nil
}
// StartUnshardedKeyspaceLegacy starts unshared keyspace with shard name as "0"
diff --git a/go/test/endtoend/cluster/reshard.go b/go/test/endtoend/cluster/reshard.go
new file mode 100644
index 00000000000..af36d4543c8
--- /dev/null
+++ b/go/test/endtoend/cluster/reshard.go
@@ -0,0 +1,102 @@
+/*
+Copyright 2024 The Vitess Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package cluster
+
+import (
+ "fmt"
+ "slices"
+ "strings"
+ "testing"
+ "time"
+)
+
+// ReshardWorkflow is used to store the information needed to run
+// Reshard commands.
+type ReshardWorkflow struct {
+ t *testing.T
+ clusterInstance *LocalProcessCluster
+ workflowName string
+ targetKs string
+ sourceShards string
+ targetShards string
+}
+
+// NewReshard creates a new ReshardWorkflow.
+func NewReshard(t *testing.T, clusterInstance *LocalProcessCluster, workflowName, targetKs, targetShards, srcShards string) *ReshardWorkflow {
+ return &ReshardWorkflow{
+ t: t,
+ clusterInstance: clusterInstance,
+ workflowName: workflowName,
+ targetKs: targetKs,
+ sourceShards: srcShards,
+ targetShards: targetShards,
+ }
+}
+
+func (rw *ReshardWorkflow) Create() (string, error) {
+ args := []string{"Create"}
+ if rw.sourceShards != "" {
+ args = append(args, "--source-shards="+rw.sourceShards)
+ }
+ if rw.targetShards != "" {
+ args = append(args, "--target-shards="+rw.targetShards)
+ }
+
+ return rw.exec(args...)
+}
+
+func (rw *ReshardWorkflow) exec(args ...string) (string, error) {
+ args2 := []string{"Reshard", "--workflow=" + rw.workflowName, "--target-keyspace=" + rw.targetKs}
+ args2 = append(args2, args...)
+ return rw.clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput(args2...)
+}
+
+func (rw *ReshardWorkflow) SwitchReadsAndWrites() (string, error) {
+ return rw.exec("SwitchTraffic")
+}
+
+func (rw *ReshardWorkflow) ReverseReadsAndWrites() (string, error) {
+ return rw.exec("ReverseTraffic")
+}
+
+func (rw *ReshardWorkflow) Cancel() (string, error) {
+ return rw.exec("Cancel")
+}
+
+func (rw *ReshardWorkflow) Complete() (string, error) {
+ return rw.exec("Complete")
+}
+
+func (rw *ReshardWorkflow) Show() (string, error) {
+ return rw.exec("Show")
+}
+
+func (rw *ReshardWorkflow) WaitForVreplCatchup(timeToWait time.Duration) {
+ targetShards := strings.Split(rw.targetShards, ",")
+ for _, ks := range rw.clusterInstance.Keyspaces {
+ if ks.Name != rw.targetKs {
+ continue
+ }
+ for _, shard := range ks.Shards {
+ if !slices.Contains(targetShards, shard.Name) {
+ continue
+ }
+ vttablet := shard.PrimaryTablet().VttabletProcess
+ vttablet.WaitForVReplicationToCatchup(rw.t, rw.workflowName, fmt.Sprintf("vt_%s", vttablet.Keyspace), "", timeToWait)
+ }
+ }
+}
diff --git a/go/test/endtoend/reparent/plannedreparent/reparent_test.go b/go/test/endtoend/reparent/plannedreparent/reparent_test.go
index 5986056924e..d3907b0bc5b 100644
--- a/go/test/endtoend/reparent/plannedreparent/reparent_test.go
+++ b/go/test/endtoend/reparent/plannedreparent/reparent_test.go
@@ -473,7 +473,7 @@ func TestFullStatus(t *testing.T) {
assert.Equal(t, "ON", primaryStatus.GtidMode)
assert.True(t, primaryStatus.LogReplicaUpdates)
assert.True(t, primaryStatus.LogBinEnabled)
- assert.Regexp(t, `[58]\.[07].*`, primaryStatus.Version)
+ assert.Regexp(t, `[58]\.[074].*`, primaryStatus.Version)
assert.NotEmpty(t, primaryStatus.VersionComment)
replicaTablet := tablets[1]
@@ -527,7 +527,7 @@ func TestFullStatus(t *testing.T) {
assert.Equal(t, "ON", replicaStatus.GtidMode)
assert.True(t, replicaStatus.LogReplicaUpdates)
assert.True(t, replicaStatus.LogBinEnabled)
- assert.Regexp(t, `[58]\.[07].*`, replicaStatus.Version)
+ assert.Regexp(t, `[58]\.[074].*`, replicaStatus.Version)
assert.NotEmpty(t, replicaStatus.VersionComment)
}
diff --git a/go/test/endtoend/transaction/twopc/fuzz/fuzzer_test.go b/go/test/endtoend/transaction/twopc/fuzz/fuzzer_test.go
index 5bbb484ec1e..c908a99e631 100644
--- a/go/test/endtoend/transaction/twopc/fuzz/fuzzer_test.go
+++ b/go/test/endtoend/transaction/twopc/fuzz/fuzzer_test.go
@@ -105,12 +105,12 @@ func TestTwoPCFuzzTest(t *testing.T) {
timeForTesting: 5 * time.Second,
},
{
- name: "Multiple Threads - Multiple Set - PRS, ERS, and MySQL & Vttablet restart, OnlineDDL, MoveTables disruptions",
+ name: "Multiple Threads - Multiple Set - PRS, ERS, and MySQL & Vttablet restart, OnlineDDL, MoveTables, Reshard disruptions",
threads: 4,
updateSets: 4,
timeForTesting: 5 * time.Second,
- clusterDisruptions: []func(t *testing.T){prs, ers, mysqlRestarts, vttabletRestarts, onlineDDLFuzzer, moveTablesFuzzer},
- disruptionProbability: []int{5, 5, 5, 5, 5, 5},
+ clusterDisruptions: []func(t *testing.T){prs, ers, mysqlRestarts, vttabletRestarts, onlineDDLFuzzer, moveTablesFuzzer, reshardFuzzer},
+ disruptionProbability: []int{5, 5, 5, 5, 5, 5, 5},
},
}
@@ -486,6 +486,23 @@ func moveTablesFuzzer(t *testing.T) {
assert.NoError(t, err, output)
}
+// reshardFuzzer runs a Reshard workflow.
+func reshardFuzzer(t *testing.T) {
+ var srcShards, targetShards string
+ shardCount := len(clusterInstance.Keyspaces[0].Shards)
+ if shardCount == 2 {
+ srcShards = "40-"
+ targetShards = "40-80,80-"
+ } else {
+ srcShards = "40-80,80-"
+ targetShards = "40-"
+ }
+ log.Errorf("Reshard from - \"%v\" to \"%v\"", srcShards, targetShards)
+ twopcutil.AddShards(t, clusterInstance, keyspaceName, strings.Split(targetShards, ","))
+ err := twopcutil.RunReshard(t, clusterInstance, "TestTwoPCFuzzTest", keyspaceName, srcShards, targetShards)
+ require.NoError(t, err)
+}
+
func mysqlRestarts(t *testing.T) {
shards := clusterInstance.Keyspaces[0].Shards
shard := shards[rand.Intn(len(shards))]
diff --git a/go/test/endtoend/transaction/twopc/fuzz/main_test.go b/go/test/endtoend/transaction/twopc/fuzz/main_test.go
index 86e524e648f..1b05615d51a 100644
--- a/go/test/endtoend/transaction/twopc/fuzz/main_test.go
+++ b/go/test/endtoend/transaction/twopc/fuzz/main_test.go
@@ -67,6 +67,7 @@ func TestMain(m *testing.M) {
clusterInstance.VtGateExtraArgs = append(clusterInstance.VtGateExtraArgs,
"--transaction_mode", "TWOPC",
"--grpc_use_effective_callerid",
+ "--tablet_refresh_interval", "2s",
)
clusterInstance.VtTabletExtraArgs = append(clusterInstance.VtTabletExtraArgs,
"--twopc_enable",
diff --git a/go/test/endtoend/transaction/twopc/stress/main_test.go b/go/test/endtoend/transaction/twopc/stress/main_test.go
index 05525171d2d..782ffe1ec12 100644
--- a/go/test/endtoend/transaction/twopc/stress/main_test.go
+++ b/go/test/endtoend/transaction/twopc/stress/main_test.go
@@ -67,6 +67,7 @@ func TestMain(m *testing.M) {
clusterInstance.VtGateExtraArgs = append(clusterInstance.VtGateExtraArgs,
"--transaction_mode", "TWOPC",
"--grpc_use_effective_callerid",
+ "--tablet_refresh_interval", "2s",
)
clusterInstance.VtTabletExtraArgs = append(clusterInstance.VtTabletExtraArgs,
"--twopc_enable",
diff --git a/go/test/endtoend/transaction/twopc/stress/stress_test.go b/go/test/endtoend/transaction/twopc/stress/stress_test.go
index 819e7ea48d3..acb8146975b 100644
--- a/go/test/endtoend/transaction/twopc/stress/stress_test.go
+++ b/go/test/endtoend/transaction/twopc/stress/stress_test.go
@@ -47,6 +47,7 @@ func TestDisruptions(t *testing.T) {
testcases := []struct {
disruptionName string
commitDelayTime string
+ setupFunc func(t *testing.T)
disruption func(t *testing.T) error
resetFunc func(t *testing.T)
}{
@@ -57,6 +58,13 @@ func TestDisruptions(t *testing.T) {
return nil
},
},
+ {
+ disruptionName: "Resharding",
+ commitDelayTime: "20",
+ setupFunc: createShard,
+ disruption: mergeShards,
+ resetFunc: splitShardsBack,
+ },
{
disruptionName: "PlannedReparentShard",
commitDelayTime: "5",
@@ -98,6 +106,9 @@ func TestDisruptions(t *testing.T) {
t.Run(fmt.Sprintf("%s-%ss delay", tt.disruptionName, tt.commitDelayTime), func(t *testing.T) {
// Reparent all the shards to first tablet being the primary.
reparentToFirstTablet(t)
+ if tt.setupFunc != nil {
+ tt.setupFunc(t)
+ }
// cleanup all the old data.
conn, closer := start(t)
defer closer()
@@ -156,6 +167,23 @@ func TestDisruptions(t *testing.T) {
}
}
+func mergeShards(t *testing.T) error {
+ return twopcutil.RunReshard(t, clusterInstance, "TestDisruptions", keyspaceName, "40-80,80-", "40-")
+}
+
+func splitShardsBack(t *testing.T) {
+ t.Helper()
+ twopcutil.AddShards(t, clusterInstance, keyspaceName, []string{"40-80", "80-"})
+ err := twopcutil.RunReshard(t, clusterInstance, "TestDisruptions", keyspaceName, "40-", "40-80,80-")
+ require.NoError(t, err)
+}
+
+// createShard creates a new shard in the keyspace that we'll use for Resharding.
+func createShard(t *testing.T) {
+ t.Helper()
+ twopcutil.AddShards(t, clusterInstance, keyspaceName, []string{"40-"})
+}
+
// threadToWrite is a helper function to write to the database in a loop.
func threadToWrite(t *testing.T, ctx context.Context, id int) {
for {
diff --git a/go/test/endtoend/transaction/twopc/utils/utils.go b/go/test/endtoend/transaction/twopc/utils/utils.go
index ecc91245a67..14f3214ae00 100644
--- a/go/test/endtoend/transaction/twopc/utils/utils.go
+++ b/go/test/endtoend/transaction/twopc/utils/utils.go
@@ -21,6 +21,8 @@ import (
"fmt"
"os"
"path"
+ "slices"
+ "strings"
"testing"
"time"
@@ -170,3 +172,52 @@ func WaitForMigrationStatus(t *testing.T, vtParams *mysql.ConnParams, ks string,
}
}
}
+
+func RunReshard(t *testing.T, clusterInstance *cluster.LocalProcessCluster, workflowName, keyspaceName string, sourceShards, targetShards string) error {
+ rw := cluster.NewReshard(t, clusterInstance, workflowName, keyspaceName, targetShards, sourceShards)
+ // Initiate Reshard.
+ output, err := rw.Create()
+ require.NoError(t, err, output)
+ // Wait for vreplication to catchup. Should be very fast since we don't have a lot of rows.
+ rw.WaitForVreplCatchup(10 * time.Second)
+ // SwitchTraffic
+ output, err = rw.SwitchReadsAndWrites()
+ require.NoError(t, err, output)
+ output, err = rw.Complete()
+ require.NoError(t, err, output)
+
+ // When Reshard completes, it has already deleted the source shards from the topo server.
+ // We just need to shutdown the vttablets, and remove them from the cluster.
+ removeShards(t, clusterInstance, keyspaceName, sourceShards)
+ return nil
+}
+
+func removeShards(t *testing.T, clusterInstance *cluster.LocalProcessCluster, keyspaceName string, shards string) {
+ sourceShardsList := strings.Split(shards, ",")
+ var remainingShards []cluster.Shard
+ for idx, keyspace := range clusterInstance.Keyspaces {
+ if keyspace.Name != keyspaceName {
+ continue
+ }
+ for _, shard := range keyspace.Shards {
+ if slices.Contains(sourceShardsList, shard.Name) {
+ for _, vttablet := range shard.Vttablets {
+ err := vttablet.VttabletProcess.TearDown()
+ require.NoError(t, err)
+ }
+ continue
+ }
+ remainingShards = append(remainingShards, shard)
+ }
+ clusterInstance.Keyspaces[idx].Shards = remainingShards
+ }
+}
+
+func AddShards(t *testing.T, clusterInstance *cluster.LocalProcessCluster, keyspaceName string, shardNames []string) {
+ for _, shardName := range shardNames {
+ t.Helper()
+ shard, err := clusterInstance.AddShard(keyspaceName, shardName, 3, false, nil)
+ require.NoError(t, err)
+ clusterInstance.Keyspaces[0].Shards = append(clusterInstance.Keyspaces[0].Shards, *shard)
+ }
+}
diff --git a/go/test/endtoend/vreplication/migrate_test.go b/go/test/endtoend/vreplication/migrate_test.go
index c6518f0fdec..2ccb3158fd9 100644
--- a/go/test/endtoend/vreplication/migrate_test.go
+++ b/go/test/endtoend/vreplication/migrate_test.go
@@ -21,13 +21,12 @@ import (
"strings"
"testing"
+ "github.com/stretchr/testify/require"
"github.com/tidwall/gjson"
+ "vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/test/endtoend/cluster"
- "github.com/stretchr/testify/require"
-
- "vitess.io/vitess/go/mysql"
binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
)
@@ -51,8 +50,15 @@ func insertInitialDataIntoExternalCluster(t *testing.T, conn *mysql.Conn) {
func TestVtctlMigrate(t *testing.T) {
vc = NewVitessCluster(t, nil)
+ oldDefaultReplicas := defaultReplicas
+ oldDefaultRdonly := defaultRdonly
defaultReplicas = 0
defaultRdonly = 0
+ defer func() {
+ defaultReplicas = oldDefaultReplicas
+ defaultRdonly = oldDefaultRdonly
+ }()
+
defer vc.TearDown()
defaultCell := vc.Cells[vc.CellNames[0]]
@@ -314,21 +320,23 @@ func TestVtctldMigrateUnsharded(t *testing.T) {
// doesn't match that of the source cluster. The test migrates from a cluster with keyspace customer to an "external"
// cluster with keyspace rating.
func TestVtctldMigrateSharded(t *testing.T) {
+ setSidecarDBName("_vt")
+ currentWorkflowType = binlogdatapb.VReplicationWorkflowType_MoveTables
oldDefaultReplicas := defaultReplicas
oldDefaultRdonly := defaultRdonly
- defaultReplicas = 1
- defaultRdonly = 1
+ defaultReplicas = 0
+ defaultRdonly = 0
defer func() {
defaultReplicas = oldDefaultReplicas
defaultRdonly = oldDefaultRdonly
}()
- setSidecarDBName("_vt")
- currentWorkflowType = binlogdatapb.VReplicationWorkflowType_MoveTables
vc = setupCluster(t)
+ defer vc.TearDown()
+
vtgateConn := getConnection(t, vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateMySQLPort)
defer vtgateConn.Close()
- defer vc.TearDown()
+
setupCustomerKeyspace(t)
createMoveTablesWorkflow(t, "customer,Lead,datze,customer2")
tstWorkflowSwitchReadsAndWrites(t)
@@ -363,7 +371,7 @@ func TestVtctldMigrateSharded(t *testing.T) {
if output, err = extVc.VtctldClient.ExecuteCommandWithOutput("Migrate",
"--target-keyspace", "rating", "--workflow", "e1",
"create", "--source-keyspace", "customer", "--mount-name", "external", "--all-tables", "--cells=zone1",
- "--tablet-types=primary,replica"); err != nil {
+ "--tablet-types=primary"); err != nil {
require.FailNow(t, "Migrate command failed with %+v : %s\n", err, output)
}
waitForWorkflowState(t, extVc, ksWorkflow, binlogdatapb.VReplicationWorkflowState_Running.String())
diff --git a/go/test/endtoend/vreplication/resharding_workflows_v2_test.go b/go/test/endtoend/vreplication/resharding_workflows_v2_test.go
index 1e93a54b850..4c6dea61912 100644
--- a/go/test/endtoend/vreplication/resharding_workflows_v2_test.go
+++ b/go/test/endtoend/vreplication/resharding_workflows_v2_test.go
@@ -335,6 +335,9 @@ func tstWorkflowCancel(t *testing.T) error {
}
func validateReadsRoute(t *testing.T, tabletTypes string, tablet *cluster.VttabletProcess) {
+ if tablet == nil {
+ return
+ }
if tabletTypes == "" {
tabletTypes = "replica,rdonly"
}
@@ -350,11 +353,15 @@ func validateReadsRoute(t *testing.T, tabletTypes string, tablet *cluster.Vttabl
}
func validateReadsRouteToSource(t *testing.T, tabletTypes string) {
- validateReadsRoute(t, tabletTypes, sourceReplicaTab)
+ if sourceReplicaTab != nil {
+ validateReadsRoute(t, tabletTypes, sourceReplicaTab)
+ }
}
func validateReadsRouteToTarget(t *testing.T, tabletTypes string) {
- validateReadsRoute(t, tabletTypes, targetReplicaTab1)
+ if targetReplicaTab1 != nil {
+ validateReadsRoute(t, tabletTypes, targetReplicaTab1)
+ }
}
func validateWritesRouteToSource(t *testing.T) {
@@ -849,8 +856,12 @@ func setupCluster(t *testing.T) *VitessCluster {
insertInitialData(t)
defaultCell := vc.Cells[vc.CellNames[0]]
sourceTab = vc.Cells[defaultCell.Name].Keyspaces["product"].Shards["0"].Tablets["zone1-100"].Vttablet
- sourceReplicaTab = vc.Cells[defaultCell.Name].Keyspaces["product"].Shards["0"].Tablets["zone1-101"].Vttablet
- sourceRdonlyTab = vc.Cells[defaultCell.Name].Keyspaces["product"].Shards["0"].Tablets["zone1-102"].Vttablet
+ if defaultReplicas > 0 {
+ sourceReplicaTab = vc.Cells[defaultCell.Name].Keyspaces["product"].Shards["0"].Tablets["zone1-101"].Vttablet
+ }
+ if defaultRdonly > 0 {
+ sourceRdonlyTab = vc.Cells[defaultCell.Name].Keyspaces["product"].Shards["0"].Tablets["zone1-102"].Vttablet
+ }
return vc
}
@@ -864,8 +875,12 @@ func setupCustomerKeyspace(t *testing.T) {
custKs := vc.Cells[defaultCell.Name].Keyspaces["customer"]
targetTab1 = custKs.Shards["-80"].Tablets["zone1-200"].Vttablet
targetTab2 = custKs.Shards["80-"].Tablets["zone1-300"].Vttablet
- targetReplicaTab1 = custKs.Shards["-80"].Tablets["zone1-201"].Vttablet
- targetRdonlyTab1 = custKs.Shards["-80"].Tablets["zone1-202"].Vttablet
+ if defaultReplicas > 0 {
+ targetReplicaTab1 = custKs.Shards["-80"].Tablets["zone1-201"].Vttablet
+ }
+ if defaultRdonly > 0 {
+ targetRdonlyTab1 = custKs.Shards["-80"].Tablets["zone1-202"].Vttablet
+ }
}
func setupCustomer2Keyspace(t *testing.T) {
diff --git a/go/test/endtoend/vtorc/readtopologyinstance/main_test.go b/go/test/endtoend/vtorc/readtopologyinstance/main_test.go
index 98bf16ec596..fa8dc116782 100644
--- a/go/test/endtoend/vtorc/readtopologyinstance/main_test.go
+++ b/go/test/endtoend/vtorc/readtopologyinstance/main_test.go
@@ -78,7 +78,7 @@ func TestReadTopologyInstanceBufferable(t *testing.T) {
assert.Contains(t, primaryInstance.InstanceAlias, "zone1")
assert.NotEqual(t, 0, primaryInstance.ServerID)
assert.Greater(t, len(primaryInstance.ServerUUID), 10)
- assert.Regexp(t, "[58].[70].*", primaryInstance.Version)
+ assert.Regexp(t, "[58].[704].*", primaryInstance.Version)
assert.NotEmpty(t, primaryInstance.VersionComment)
assert.False(t, primaryInstance.ReadOnly)
assert.True(t, primaryInstance.LogBinEnabled)
@@ -128,7 +128,7 @@ func TestReadTopologyInstanceBufferable(t *testing.T) {
assert.Contains(t, replicaInstance.InstanceAlias, "zone1")
assert.NotEqual(t, 0, replicaInstance.ServerID)
assert.Greater(t, len(replicaInstance.ServerUUID), 10)
- assert.Regexp(t, "[58].[70].*", replicaInstance.Version)
+ assert.Regexp(t, "[58].[704].*", replicaInstance.Version)
assert.NotEmpty(t, replicaInstance.VersionComment)
assert.True(t, replicaInstance.ReadOnly)
assert.True(t, replicaInstance.LogBinEnabled)
diff --git a/go/vt/mysqlctl/fakemysqldaemon.go b/go/vt/mysqlctl/fakemysqldaemon.go
index 2557f7a7e51..1bab1639acb 100644
--- a/go/vt/mysqlctl/fakemysqldaemon.go
+++ b/go/vt/mysqlctl/fakemysqldaemon.go
@@ -81,6 +81,9 @@ type FakeMysqlDaemon struct {
// and ReplicationStatus.
CurrentPrimaryPosition replication.Position
+ // CurrentRelayLogPosition is returned by ReplicationStatus.
+ CurrentRelayLogPosition replication.Position
+
// CurrentSourceFilePosition is used to determine the executed
// file based positioning of the replication source.
CurrentSourceFilePosition replication.Position
@@ -313,6 +316,7 @@ func (fmd *FakeMysqlDaemon) ReplicationStatus(ctx context.Context) (replication.
return replication.ReplicationStatus{
Position: fmd.CurrentPrimaryPosition,
FilePosition: fmd.CurrentSourceFilePosition,
+ RelayLogPosition: fmd.CurrentRelayLogPosition,
RelayLogSourceBinlogEquivalentPosition: fmd.CurrentSourceFilePosition,
ReplicationLagSeconds: fmd.ReplicationLagSeconds,
// Implemented as AND to avoid changing all tests that were
diff --git a/go/vt/vttablet/tabletmanager/restore.go b/go/vt/vttablet/tabletmanager/restore.go
index a56eb4da37b..35587124108 100644
--- a/go/vt/vttablet/tabletmanager/restore.go
+++ b/go/vt/vttablet/tabletmanager/restore.go
@@ -36,18 +36,15 @@ import (
"vitess.io/vitess/go/vt/logutil"
"vitess.io/vitess/go/vt/mysqlctl"
"vitess.io/vitess/go/vt/mysqlctl/backupstats"
+ binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
+ tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata"
+ topodatapb "vitess.io/vitess/go/vt/proto/topodata"
+ vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/proto/vttime"
"vitess.io/vitess/go/vt/servenv"
- "vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/topo/topoproto"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vttablet/tabletmanager/vreplication"
- "vitess.io/vitess/go/vt/vttablet/tmclient"
-
- binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
- tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata"
- topodatapb "vitess.io/vitess/go/vt/proto/topodata"
- vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
)
// This file handles the initial backup restore upon startup.
@@ -326,7 +323,7 @@ func (tm *TabletManager) restoreDataLocked(ctx context.Context, logger logutil.L
} else if keyspaceInfo.KeyspaceType == topodatapb.KeyspaceType_NORMAL {
// Reconnect to primary only for "NORMAL" keyspaces
params.Logger.Infof("Restore: starting replication at position %v", pos)
- if err := tm.startReplication(context.Background(), pos, originalType); err != nil {
+ if err := tm.startReplication(ctx, pos, originalType); err != nil {
return err
}
}
@@ -577,47 +574,30 @@ func (tm *TabletManager) disableReplication(ctx context.Context) error {
}
func (tm *TabletManager) startReplication(ctx context.Context, pos replication.Position, tabletType topodatapb.TabletType) error {
- if err := tm.MysqlDaemon.StopReplication(ctx, nil); err != nil {
+ // The first three steps of stopping replication, and setting the replication position,
+ // we want to do even if the context expires, so we use a background context for these tasks.
+ if err := tm.MysqlDaemon.StopReplication(context.Background(), nil); err != nil {
return vterrors.Wrap(err, "failed to stop replication")
}
- if err := tm.MysqlDaemon.ResetReplicationParameters(ctx); err != nil {
+ if err := tm.MysqlDaemon.ResetReplicationParameters(context.Background()); err != nil {
return vterrors.Wrap(err, "failed to reset replication")
}
// Set the position at which to resume from the primary.
- if err := tm.MysqlDaemon.SetReplicationPosition(ctx, pos); err != nil {
+ if err := tm.MysqlDaemon.SetReplicationPosition(context.Background(), pos); err != nil {
return vterrors.Wrap(err, "failed to set replication position")
}
- primary, err := tm.initializeReplication(ctx, tabletType)
+ primaryPosStr, err := tm.initializeReplication(ctx, tabletType)
// If we ran into an error while initializing replication, then there is no point in waiting for catch-up.
// Also, if there is no primary tablet in the shard, we don't need to proceed further.
- if err != nil || primary == nil {
+ if err != nil || primaryPosStr == "" {
return err
}
- // wait for reliable replication_lag_seconds
- // we have pos where we want to resume from
- // if PrimaryPosition is the same, that means no writes
- // have happened to primary, so we are up-to-date
- // otherwise, wait for replica's Position to change from
- // the initial pos before proceeding
- tmc := tmclient.NewTabletManagerClient()
- defer tmc.Close()
- remoteCtx, remoteCancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout)
- defer remoteCancel()
- posStr, err := tmc.PrimaryPosition(remoteCtx, primary.Tablet)
- if err != nil {
- // It is possible that though PrimaryAlias is set, the primary tablet is unreachable
- // Log a warning and let tablet restore in that case
- // If we had instead considered this fatal, all tablets would crash-loop
- // until a primary appears, which would make it impossible to elect a primary.
- log.Warningf("Can't get primary replication position after restore: %v", err)
- return nil
- }
- primaryPos, err := replication.DecodePosition(posStr)
+ primaryPos, err := replication.DecodePosition(primaryPosStr)
if err != nil {
- return vterrors.Wrapf(err, "can't decode primary replication position: %q", posStr)
+ return vterrors.Wrapf(err, "can't decode primary replication position: %q", primaryPos)
}
if !pos.Equal(primaryPos) {
diff --git a/go/vt/vttablet/tabletmanager/rpc_replication.go b/go/vt/vttablet/tabletmanager/rpc_replication.go
index 1bd05493a59..6fe4e779cbb 100644
--- a/go/vt/vttablet/tabletmanager/rpc_replication.go
+++ b/go/vt/vttablet/tabletmanager/rpc_replication.go
@@ -31,6 +31,7 @@ import (
"vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/topo/topoproto"
"vitess.io/vitess/go/vt/vterrors"
+ "vitess.io/vitess/go/vt/vttablet/tabletserver"
replicationdatapb "vitess.io/vitess/go/vt/proto/replicationdata"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
@@ -354,7 +355,7 @@ func (tm *TabletManager) InitPrimary(ctx context.Context, semiSync bool) (string
// If semi-sync is enabled, we need to set two pc to be allowed.
// Otherwise, we block all Prepared calls because atomic transactions require semi-sync for correctness..
- tm.QueryServiceControl.SetTwoPCAllowed(semiSyncAction == SemiSyncActionSet)
+ tm.QueryServiceControl.SetTwoPCAllowed(tabletserver.TwoPCAllowed_SemiSync, semiSyncAction == SemiSyncActionSet)
// Setting super_read_only `OFF` so that we can run the DDL commands
if _, err := tm.MysqlDaemon.SetSuperReadOnly(ctx, false); err != nil {
@@ -600,7 +601,7 @@ func (tm *TabletManager) UndoDemotePrimary(ctx context.Context, semiSync bool) e
// If semi-sync is enabled, we need to set two pc to be allowed.
// Otherwise, we block all Prepared calls because atomic transactions require semi-sync for correctness..
- tm.QueryServiceControl.SetTwoPCAllowed(semiSyncAction == SemiSyncActionSet)
+ tm.QueryServiceControl.SetTwoPCAllowed(tabletserver.TwoPCAllowed_SemiSync, semiSyncAction == SemiSyncActionSet)
// If using semi-sync, we need to enable source-side.
if err := tm.fixSemiSync(ctx, topodatapb.TabletType_PRIMARY, semiSyncAction); err != nil {
@@ -707,6 +708,7 @@ func (tm *TabletManager) setReplicationSourceLocked(ctx context.Context, parentA
wasReplicating := false
shouldbeReplicating := false
status, err := tm.MysqlDaemon.ReplicationStatus(ctx)
+ replicaPosition := status.RelayLogPosition
if err == mysql.ErrNotReplica {
// This is a special error that means we actually succeeded in reading
// the status, but the status is empty because replication is not
@@ -716,6 +718,12 @@ func (tm *TabletManager) setReplicationSourceLocked(ctx context.Context, parentA
// Since we continue in the case of this error, make sure 'status' is
// in a known, empty state.
status = replication.ReplicationStatus{}
+ // The replica position we use for the errant GTID detection should be the executed
+ // GTID set since this tablet is not running replication at all.
+ replicaPosition, err = tm.MysqlDaemon.PrimaryPosition(ctx)
+ if err != nil {
+ return err
+ }
} else if err != nil {
// Abort on any other non-nil error.
return err
@@ -747,12 +755,35 @@ func (tm *TabletManager) setReplicationSourceLocked(ctx context.Context, parentA
if err != nil {
return err
}
+
host := parent.Tablet.MysqlHostname
port := parent.Tablet.MysqlPort
// If host is empty, then we shouldn't even attempt the reparent. That tablet has already shutdown.
if host == "" {
return vterrors.New(vtrpc.Code_FAILED_PRECONDITION, "Shard primary has empty mysql hostname")
}
+ // Errant GTID detection.
+ {
+ // Find the executed GTID set of the tablet that we are reparenting to.
+ // We will then compare our own position against it to verify that we don't
+ // have an errant GTID. If we find any GTID that we have, but the primary doesn't,
+ // we will not enter the replication graph and instead fail replication.
+ primaryPositionStr, err := tm.tmc.PrimaryPosition(ctx, parent.Tablet)
+ if err != nil {
+ return err
+ }
+ primaryPosition, err := replication.DecodePosition(primaryPositionStr)
+ if err != nil {
+ return err
+ }
+ errantGtid, err := replication.ErrantGTIDsOnReplica(replicaPosition, primaryPosition)
+ if err != nil {
+ return err
+ }
+ if errantGtid != "" {
+ return vterrors.New(vtrpc.Code_FAILED_PRECONDITION, fmt.Sprintf("Errant GTID detected - %s; Primary GTID - %s, Replica GTID - %s", errantGtid, primaryPosition, replicaPosition.String()))
+ }
+ }
if status.SourceHost != host || status.SourcePort != port || heartbeatInterval != 0 {
// This handles both changing the address and starting replication.
if err := tm.MysqlDaemon.SetReplicationSource(ctx, host, port, heartbeatInterval, wasReplicating, shouldbeReplicating); err != nil {
@@ -925,7 +956,7 @@ func (tm *TabletManager) PromoteReplica(ctx context.Context, semiSync bool) (str
// If semi-sync is enabled, we need to set two pc to be allowed.
// Otherwise, we block all Prepared calls because atomic transactions require semi-sync for correctness..
- tm.QueryServiceControl.SetTwoPCAllowed(semiSyncAction == SemiSyncActionSet)
+ tm.QueryServiceControl.SetTwoPCAllowed(tabletserver.TwoPCAllowed_SemiSync, semiSyncAction == SemiSyncActionSet)
pos, err := tm.MysqlDaemon.Promote(ctx, tm.hookExtraEnv())
if err != nil {
diff --git a/go/vt/vttablet/tabletmanager/tm_init.go b/go/vt/vttablet/tabletmanager/tm_init.go
index 5ab49ebfbcf..efc6c57661a 100644
--- a/go/vt/vttablet/tabletmanager/tm_init.go
+++ b/go/vt/vttablet/tabletmanager/tm_init.go
@@ -50,6 +50,7 @@ import (
"vitess.io/vitess/go/constants/sidecar"
"vitess.io/vitess/go/flagutil"
"vitess.io/vitess/go/mysql/collations"
+ "vitess.io/vitess/go/mysql/replication"
"vitess.io/vitess/go/mysql/sqlerror"
"vitess.io/vitess/go/netutil"
"vitess.io/vitess/go/protoutil"
@@ -64,6 +65,7 @@ import (
"vitess.io/vitess/go/vt/mysqlctl"
querypb "vitess.io/vitess/go/vt/proto/query"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
+ "vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/servenv"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/topo/topoproto"
@@ -75,6 +77,7 @@ import (
"vitess.io/vitess/go/vt/vttablet/tabletmanager/vreplication"
"vitess.io/vitess/go/vt/vttablet/tabletserver"
"vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv"
+ "vitess.io/vitess/go/vt/vttablet/tmclient"
)
const (
@@ -162,6 +165,9 @@ type TabletManager struct {
VDiffEngine *vdiff.Engine
Env *vtenv.Environment
+ // tmc is used to run an RPC against other vttablets.
+ tmc tmclient.TabletManagerClient
+
// tmState manages the TabletManager state.
tmState *tmState
@@ -356,6 +362,7 @@ func (tm *TabletManager) Start(tablet *topodatapb.Tablet, config *tabletenv.Tabl
log.Infof("TabletManager Start")
tm.DBConfigs.DBName = topoproto.TabletDbName(tablet)
tm.tabletAlias = tablet.Alias
+ tm.tmc = tmclient.NewTabletManagerClient()
tm.tmState = newTMState(tm, tablet)
tm.actionSema = semaphore.NewWeighted(1)
tm._waitForGrantsComplete = make(chan struct{})
@@ -549,13 +556,17 @@ func (tm *TabletManager) createKeyspaceShard(ctx context.Context) (*topo.ShardIn
return nil, err
}
- tm.tmState.RefreshFromTopoInfo(ctx, shardInfo, nil)
+ if err := tm.tmState.RefreshFromTopoInfo(ctx, shardInfo, nil); err != nil {
+ return nil, err
+ }
// Rebuild keyspace if this the first tablet in this keyspace/cell
srvKeyspace, err := tm.TopoServer.GetSrvKeyspace(ctx, tm.tabletAlias.Cell, tablet.Keyspace)
switch {
case err == nil:
- tm.tmState.RefreshFromTopoInfo(ctx, nil, srvKeyspace)
+ if err := tm.tmState.RefreshFromTopoInfo(ctx, nil, srvKeyspace); err != nil {
+ return nil, err
+ }
case topo.IsErrType(err, topo.NoNode):
var rebuildKsCtx context.Context
rebuildKsCtx, tm._rebuildKeyspaceCancel = context.WithCancel(tm.BatchCtx)
@@ -607,7 +618,10 @@ func (tm *TabletManager) rebuildKeyspace(ctx context.Context, done chan<- struct
defer func() {
log.Infof("Keyspace rebuilt: %v", keyspace)
if ctx.Err() == nil {
- tm.tmState.RefreshFromTopoInfo(tm.BatchCtx, nil, srvKeyspace)
+ err := tm.tmState.RefreshFromTopoInfo(tm.BatchCtx, nil, srvKeyspace)
+ if err != nil {
+ log.Errorf("Error refreshing topo information - %v", err)
+ }
}
close(done)
}()
@@ -951,50 +965,50 @@ func (tm *TabletManager) hookExtraEnv() map[string]string {
// initializeReplication is used to initialize the replication when the tablet starts.
// It returns the current primary tablet for use externally
-func (tm *TabletManager) initializeReplication(ctx context.Context, tabletType topodatapb.TabletType) (primary *topo.TabletInfo, err error) {
+func (tm *TabletManager) initializeReplication(ctx context.Context, tabletType topodatapb.TabletType) (primaryPosStr string, err error) {
// If active reparents are disabled, we do not touch replication.
// There is nothing to do
if mysqlctl.DisableActiveReparents {
- return nil, nil
+ return "", nil
}
// If the desired tablet type is primary, then we shouldn't be setting our replication source.
// So there is nothing to do.
if tabletType == topodatapb.TabletType_PRIMARY {
- return nil, nil
+ return "", nil
}
// Read the shard to find the current primary, and its location.
tablet := tm.Tablet()
si, err := tm.TopoServer.GetShard(ctx, tablet.Keyspace, tablet.Shard)
if err != nil {
- return nil, vterrors.Wrap(err, "cannot read shard")
+ return "", vterrors.Wrap(err, "cannot read shard")
}
if si.PrimaryAlias == nil {
// There's no primary. This is fine, since there might be no primary currently
log.Warningf("cannot start replication during initialization: shard %v/%v has no primary.", tablet.Keyspace, tablet.Shard)
- return nil, nil
+ return "", nil
}
if topoproto.TabletAliasEqual(si.PrimaryAlias, tablet.Alias) {
// We used to be the primary before we got restarted,
// and no other primary has been elected in the meantime.
// There isn't anything to do here either.
log.Warningf("cannot start replication during initialization: primary in shard record still points to this tablet.")
- return nil, nil
+ return "", nil
}
currentPrimary, err := tm.TopoServer.GetTablet(ctx, si.PrimaryAlias)
if err != nil {
- return nil, vterrors.Wrapf(err, "cannot read primary tablet %v", si.PrimaryAlias)
+ return "", vterrors.Wrapf(err, "cannot read primary tablet %v", si.PrimaryAlias)
}
durabilityName, err := tm.TopoServer.GetKeyspaceDurability(ctx, tablet.Keyspace)
if err != nil {
- return nil, vterrors.Wrapf(err, "cannot read keyspace durability policy %v", tablet.Keyspace)
+ return "", vterrors.Wrapf(err, "cannot read keyspace durability policy %v", tablet.Keyspace)
}
log.Infof("Getting a new durability policy for %v", durabilityName)
durability, err := reparentutil.GetDurabilityPolicy(durabilityName)
if err != nil {
- return nil, vterrors.Wrapf(err, "cannot get durability policy %v", durabilityName)
+ return "", vterrors.Wrapf(err, "cannot get durability policy %v", durabilityName)
}
// If using semi-sync, we need to enable it before connecting to primary.
// We should set the correct type, since it is used in replica semi-sync
@@ -1003,21 +1017,50 @@ func (tm *TabletManager) initializeReplication(ctx context.Context, tabletType t
semiSyncAction, err := tm.convertBoolToSemiSyncAction(ctx, reparentutil.IsReplicaSemiSync(durability, currentPrimary.Tablet, tablet))
if err != nil {
- return nil, err
+ return "", err
}
if err := tm.fixSemiSync(ctx, tabletType, semiSyncAction); err != nil {
- return nil, err
+ return "", err
}
// Set primary and start replication.
if currentPrimary.Tablet.MysqlHostname == "" {
log.Warningf("primary tablet in the shard record does not have mysql hostname specified, possibly because that tablet has been shut down.")
- return nil, nil
+ return "", nil
+ }
+
+ // Find our own executed GTID set and,
+ // the executed GTID set of the tablet that we are reparenting to.
+ // We will then compare our own position against it to verify that we don't
+ // have an errant GTID. If we find any GTID that we have, but the primary doesn't,
+ // we will not enter the replication graph and instead fail replication.
+ replicaPos, err := tm.MysqlDaemon.PrimaryPosition(ctx)
+ if err != nil {
+ return "", err
}
+
+ primaryPosStr, err = tm.tmc.PrimaryPosition(ctx, currentPrimary.Tablet)
+ if err != nil {
+ return "", err
+ }
+
+ primaryPosition, err := replication.DecodePosition(primaryPosStr)
+ if err != nil {
+ return "", err
+ }
+
+ errantGTIDs, err := replication.ErrantGTIDsOnReplica(replicaPos, primaryPosition)
+ if err != nil {
+ return "", err
+ }
+ if errantGTIDs != "" {
+ return "", vterrors.New(vtrpc.Code_FAILED_PRECONDITION, fmt.Sprintf("Errant GTID detected - %s", errantGTIDs))
+ }
+
if err := tm.MysqlDaemon.SetReplicationSource(ctx, currentPrimary.Tablet.MysqlHostname, currentPrimary.Tablet.MysqlPort, 0, true, true); err != nil {
- return nil, vterrors.Wrap(err, "MysqlDaemon.SetReplicationSource failed")
+ return "", vterrors.Wrap(err, "MysqlDaemon.SetReplicationSource failed")
}
- return currentPrimary, nil
+ return primaryPosStr, nil
}
diff --git a/go/vt/vttablet/tabletmanager/tm_state.go b/go/vt/vttablet/tabletmanager/tm_state.go
index cf56c515cfc..75917cb2065 100644
--- a/go/vt/vttablet/tabletmanager/tm_state.go
+++ b/go/vt/vttablet/tabletmanager/tm_state.go
@@ -37,6 +37,7 @@ import (
"vitess.io/vitess/go/vt/topo/topoproto"
"vitess.io/vitess/go/vt/topotools"
"vitess.io/vitess/go/vt/vterrors"
+ "vitess.io/vitess/go/vt/vttablet/tabletserver"
"vitess.io/vitess/go/vt/vttablet/tabletserver/rules"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
@@ -135,11 +136,10 @@ func (ts *tmState) RefreshFromTopo(ctx context.Context) error {
if err != nil {
return err
}
- ts.RefreshFromTopoInfo(ctx, shardInfo, srvKeyspace)
- return nil
+ return ts.RefreshFromTopoInfo(ctx, shardInfo, srvKeyspace)
}
-func (ts *tmState) RefreshFromTopoInfo(ctx context.Context, shardInfo *topo.ShardInfo, srvKeyspace *topodatapb.SrvKeyspace) {
+func (ts *tmState) RefreshFromTopoInfo(ctx context.Context, shardInfo *topo.ShardInfo, srvKeyspace *topodatapb.SrvKeyspace) error {
ts.mu.Lock()
defer ts.mu.Unlock()
@@ -157,6 +157,7 @@ func (ts *tmState) RefreshFromTopoInfo(ctx context.Context, shardInfo *topo.Shar
if srvKeyspace != nil {
ts.isShardServing = make(map[topodatapb.TabletType]bool)
ts.tabletControls = make(map[topodatapb.TabletType]bool)
+ ts.tm.QueryServiceControl.SetTwoPCAllowed(tabletserver.TwoPCAllowed_TabletControls, true)
for _, partition := range srvKeyspace.GetPartitions() {
@@ -169,7 +170,10 @@ func (ts *tmState) RefreshFromTopoInfo(ctx context.Context, shardInfo *topo.Shar
for _, tabletControl := range partition.GetShardTabletControls() {
if key.KeyRangeEqual(tabletControl.GetKeyRange(), ts.KeyRange()) {
if tabletControl.QueryServiceDisabled {
- ts.tabletControls[partition.GetServedType()] = true
+ err := ts.prepareForDisableQueryService(ctx, partition.GetServedType())
+ if err != nil {
+ return err
+ }
}
break
}
@@ -177,7 +181,20 @@ func (ts *tmState) RefreshFromTopoInfo(ctx context.Context, shardInfo *topo.Shar
}
}
- _ = ts.updateLocked(ctx)
+ return ts.updateLocked(ctx)
+}
+
+// prepareForDisableQueryService prepares the tablet for disabling query service.
+func (ts *tmState) prepareForDisableQueryService(ctx context.Context, servType topodatapb.TabletType) error {
+ if servType == topodatapb.TabletType_PRIMARY {
+ ts.tm.QueryServiceControl.SetTwoPCAllowed(tabletserver.TwoPCAllowed_TabletControls, false)
+ err := ts.tm.QueryServiceControl.WaitForPreparedTwoPCTransactions(ctx)
+ if err != nil {
+ return err
+ }
+ }
+ ts.tabletControls[servType] = true
+ return nil
}
func (ts *tmState) ChangeTabletType(ctx context.Context, tabletType topodatapb.TabletType, action DBAction) error {
diff --git a/go/vt/vttablet/tabletserver/controller.go b/go/vt/vttablet/tabletserver/controller.go
index c54ab4e20dd..abb7b390e0d 100644
--- a/go/vt/vttablet/tabletserver/controller.go
+++ b/go/vt/vttablet/tabletserver/controller.go
@@ -97,8 +97,9 @@ type Controller interface {
// RedoPreparedTransactions recreates the transactions with stored prepared transaction log.
RedoPreparedTransactions()
- // SetTwoPCAllowed sets whether TwoPC is allowed or not.
- SetTwoPCAllowed(bool)
+ // SetTwoPCAllowed sets whether TwoPC is allowed or not. It also takes the reason of why it is being set.
+ // The reason should be an enum value defined in the tabletserver.
+ SetTwoPCAllowed(int, bool)
// UnresolvedTransactions returns all unresolved transactions list
UnresolvedTransactions(ctx context.Context, target *querypb.Target, abandonAgeSeconds int64) ([]*querypb.TransactionMetadata, error)
@@ -111,6 +112,9 @@ type Controller interface {
// RollbackPrepared rolls back the prepared transaction and removes the transaction log.
RollbackPrepared(ctx context.Context, target *querypb.Target, dtid string, originalID int64) error
+
+ // WaitForPreparedTwoPCTransactions waits for all prepared transactions to be resolved.
+ WaitForPreparedTwoPCTransactions(ctx context.Context) error
}
// Ensure TabletServer satisfies Controller interface.
diff --git a/go/vt/vttablet/tabletserver/dt_executor.go b/go/vt/vttablet/tabletserver/dt_executor.go
index b14e4d65d16..1aaf75edc9e 100644
--- a/go/vt/vttablet/tabletserver/dt_executor.go
+++ b/go/vt/vttablet/tabletserver/dt_executor.go
@@ -57,7 +57,7 @@ func (dte *DTExecutor) Prepare(transactionID int64, dtid string) error {
if !dte.te.twopcEnabled {
return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "2pc is not enabled")
}
- if !dte.te.twopcAllowed {
+ if !dte.te.IsTwoPCAllowed() {
return vterrors.VT10002("two-pc is enabled, but semi-sync is not")
}
defer dte.te.env.Stats().QueryTimings.Record("PREPARE", time.Now())
diff --git a/go/vt/vttablet/tabletserver/tabletserver.go b/go/vt/vttablet/tabletserver/tabletserver.go
index 90f6d9f3912..4936516be72 100644
--- a/go/vt/vttablet/tabletserver/tabletserver.go
+++ b/go/vt/vttablet/tabletserver/tabletserver.go
@@ -186,7 +186,7 @@ func NewTabletServer(ctx context.Context, env *vtenv.Environment, name string, c
tsv.messager = messager.NewEngine(tsv, tsv.se, tsv.vstreamer)
tsv.tableGC = gc.NewTableGC(tsv, topoServer, tsv.lagThrottler)
- tsv.onlineDDLExecutor = onlineddl.NewExecutor(tsv, alias, topoServer, tsv.lagThrottler, tabletTypeFunc, tsv.onlineDDLExecutorToggleTableBuffer, tsv.tableGC.RequestChecks, tsv.te.preparedPool.IsEmpty)
+ tsv.onlineDDLExecutor = onlineddl.NewExecutor(tsv, alias, topoServer, tsv.lagThrottler, tabletTypeFunc, tsv.onlineDDLExecutorToggleTableBuffer, tsv.tableGC.RequestChecks, tsv.te.preparedPool.IsEmptyForTable)
tsv.sm = &stateManager{
statelessql: tsv.statelessql,
@@ -700,6 +700,26 @@ func (tsv *TabletServer) RollbackPrepared(ctx context.Context, target *querypb.T
)
}
+// WaitForPreparedTwoPCTransactions waits for all the prepared transactions to complete.
+func (tsv *TabletServer) WaitForPreparedTwoPCTransactions(ctx context.Context) error {
+ if tsv.te.preparedPool.IsEmpty() {
+ return nil
+ }
+ ticker := time.NewTicker(100 * time.Millisecond)
+ defer ticker.Stop()
+ for {
+ select {
+ case <-ctx.Done():
+ // Return an error if we run out of time.
+ return vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "Prepared transactions have not been resolved yet")
+ case <-ticker.C:
+ if tsv.te.preparedPool.IsEmpty() {
+ return nil
+ }
+ }
+ }
+}
+
// CreateTransaction creates the metadata for a 2PC transaction.
func (tsv *TabletServer) CreateTransaction(ctx context.Context, target *querypb.Target, dtid string, participants []*querypb.Target) (err error) {
return tsv.execRequest(
@@ -1699,9 +1719,10 @@ func (tsv *TabletServer) RedoPreparedTransactions() {
tsv.te.RedoPreparedTransactions()
}
-// SetTwoPCAllowed sets whether TwoPC is allowed or not.
-func (tsv *TabletServer) SetTwoPCAllowed(allowed bool) {
- tsv.te.twopcAllowed = allowed
+// SetTwoPCAllowed sets whether TwoPC is allowed or not. It also takes the reason of why it is being set.
+// The reason should be an enum value defined in the tabletserver.
+func (tsv *TabletServer) SetTwoPCAllowed(reason int, allowed bool) {
+ tsv.te.twopcAllowed[reason] = allowed
}
// HandlePanic is part of the queryservice.QueryService interface
diff --git a/go/vt/vttablet/tabletserver/tx_engine.go b/go/vt/vttablet/tabletserver/tx_engine.go
index 549851790ed..42bec29dfa3 100644
--- a/go/vt/vttablet/tabletserver/tx_engine.go
+++ b/go/vt/vttablet/tabletserver/tx_engine.go
@@ -77,9 +77,11 @@ type TxEngine struct {
// twopcEnabled is the flag value of whether the user has enabled twopc or not.
twopcEnabled bool
- // twopcAllowed is wether it is safe to allow two pc transactions or not.
- // If the primary tablet doesn't run with semi-sync we set this to false, and disallow any prepared calls.
- twopcAllowed bool
+ // twopcAllowed is whether it is safe to allow two pc transactions or not.
+ // There are multiple reasons to disallow TwoPC:
+ // 1. If the primary tablet doesn't run with semi-sync we set this to false, and disallow any prepared calls.
+ // 2. TabletControls have been set in the tablet record, and Query service is going to be disabled.
+ twopcAllowed []bool
shutdownGracePeriod time.Duration
coordinatorAddress string
abandonAge time.Duration
@@ -94,6 +96,14 @@ type TxEngine struct {
dxNotify func()
}
+// TwoPC can be disallowed for various reasons. These are the reasons we keep track off
+// when deciding if new prepared transactions should be allowed or not.
+const (
+ TwoPCAllowed_SemiSync = iota
+ TwoPCAllowed_TabletControls
+ TwoPCAllowed_Len
+)
+
// NewTxEngine creates a new TxEngine.
func NewTxEngine(env tabletenv.Env, dxNotifier func()) *TxEngine {
config := env.Config()
@@ -105,8 +115,13 @@ func NewTxEngine(env tabletenv.Env, dxNotifier func()) *TxEngine {
limiter := txlimiter.New(env)
te.txPool = NewTxPool(env, limiter)
// We initially allow twoPC (handles vttablet restarts).
- // We will disallow them, when a new tablet is promoted if semi-sync is turned off.
- te.twopcAllowed = true
+ // We will disallow them for a few reasons -
+ // 1. when a new tablet is promoted if semi-sync is turned off.
+ // 2. TabletControls have been set by a Resharding workflow.
+ te.twopcAllowed = make([]bool, TwoPCAllowed_Len)
+ for idx := range te.twopcAllowed {
+ te.twopcAllowed[idx] = true
+ }
te.twopcEnabled = config.TwoPCEnable
if te.twopcEnabled {
if config.TwoPCAbandonAge <= 0 {
@@ -708,3 +723,13 @@ func (te *TxEngine) beginNewDbaConnection(ctx context.Context) (*StatefulConnect
_, _, err = te.txPool.begin(ctx, nil, false, sc, nil)
return sc, err
}
+
+// IsTwoPCAllowed checks if TwoPC is allowed.
+func (te *TxEngine) IsTwoPCAllowed() bool {
+ for _, allowed := range te.twopcAllowed {
+ if !allowed {
+ return false
+ }
+ }
+ return true
+}
diff --git a/go/vt/vttablet/tabletserver/tx_engine_test.go b/go/vt/vttablet/tabletserver/tx_engine_test.go
index 43916dab3c2..a9958525587 100644
--- a/go/vt/vttablet/tabletserver/tx_engine_test.go
+++ b/go/vt/vttablet/tabletserver/tx_engine_test.go
@@ -678,3 +678,46 @@ func TestCheckReceivedError(t *testing.T) {
})
}
}
+
+func TestIsTwoPCAllowed(t *testing.T) {
+ testcases := []struct {
+ semiSyncAllowed bool
+ tabletControllsAllowed bool
+ wantAllowed bool
+ }{
+ {
+ semiSyncAllowed: true,
+ tabletControllsAllowed: true,
+ wantAllowed: true,
+ },
+ {
+ semiSyncAllowed: false,
+ tabletControllsAllowed: true,
+ wantAllowed: false,
+ },
+ {
+ semiSyncAllowed: true,
+ tabletControllsAllowed: false,
+ wantAllowed: false,
+ },
+ {
+ semiSyncAllowed: false,
+ tabletControllsAllowed: false,
+ wantAllowed: false,
+ },
+ }
+
+ for _, tt := range testcases {
+ t.Run(fmt.Sprintf("SemiSyncAllowed - %v, TabletControlsAllowed - %v", tt.semiSyncAllowed, tt.tabletControllsAllowed), func(t *testing.T) {
+ te := &TxEngine{
+ twopcAllowed: []bool{true, true},
+ }
+ tsv := TabletServer{
+ te: te,
+ }
+ tsv.SetTwoPCAllowed(TwoPCAllowed_SemiSync, tt.semiSyncAllowed)
+ tsv.SetTwoPCAllowed(TwoPCAllowed_TabletControls, tt.tabletControllsAllowed)
+ require.Equal(t, tt.wantAllowed, te.IsTwoPCAllowed())
+ })
+ }
+}
diff --git a/go/vt/vttablet/tabletserver/tx_prep_pool.go b/go/vt/vttablet/tabletserver/tx_prep_pool.go
index 468c160c002..8e766062a92 100644
--- a/go/vt/vttablet/tabletserver/tx_prep_pool.go
+++ b/go/vt/vttablet/tabletserver/tx_prep_pool.go
@@ -173,7 +173,8 @@ func (pp *TxPreparedPool) FetchAllForRollback() []*StatefulConnection {
return conns
}
-func (pp *TxPreparedPool) IsEmpty(tableName string) bool {
+// IsEmptyForTable returns true if no prepared transactions are found for the table.
+func (pp *TxPreparedPool) IsEmptyForTable(tableName string) bool {
pp.mu.Lock()
defer pp.mu.Unlock()
if !pp.twoPCEnabled {
@@ -194,3 +195,17 @@ func (pp *TxPreparedPool) IsEmpty(tableName string) bool {
}
return true
}
+
+// IsEmpty returns true if the pool is empty.
+func (pp *TxPreparedPool) IsEmpty() bool {
+ pp.mu.Lock()
+ defer pp.mu.Unlock()
+ if !pp.twoPCEnabled {
+ return true
+ }
+ // If the pool is shutdown, we do not know the correct state of prepared transactions.
+ if !pp.open {
+ return false
+ }
+ return len(pp.conns) == 0
+}
diff --git a/go/vt/vttablet/tabletserver/tx_prep_pool_test.go b/go/vt/vttablet/tabletserver/tx_prep_pool_test.go
index cf6d2b61093..e8c889990f0 100644
--- a/go/vt/vttablet/tabletserver/tx_prep_pool_test.go
+++ b/go/vt/vttablet/tabletserver/tx_prep_pool_test.go
@@ -21,6 +21,8 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
+
+ "vitess.io/vitess/go/vt/vttablet/tabletserver/tx"
)
func TestEmptyPrep(t *testing.T) {
@@ -116,3 +118,135 @@ func createAndOpenPreparedPool(capacity int) *TxPreparedPool {
pp.Open()
return pp
}
+
+func TestTxPreparedPoolIsEmptyForTable(t *testing.T) {
+ tests := []struct {
+ name string
+ setupFunc func(pp *TxPreparedPool)
+ wantIsEmpty bool
+ }{
+ {
+ name: "Closed prepared pool",
+ setupFunc: func(pp *TxPreparedPool) {
+ pp.mu.Lock()
+ defer pp.mu.Unlock()
+ pp.open = false
+ },
+ wantIsEmpty: false,
+ },
+ {
+ name: "Two PC Disabled",
+ setupFunc: func(pp *TxPreparedPool) {
+ pp.mu.Lock()
+ defer pp.mu.Unlock()
+ pp.twoPCEnabled = false
+ },
+ wantIsEmpty: true,
+ },
+ {
+ name: "No prepared transactions",
+ setupFunc: func(pp *TxPreparedPool) {
+ pp.mu.Lock()
+ defer pp.mu.Unlock()
+ pp.open = true
+ },
+ wantIsEmpty: true,
+ },
+ {
+ name: "Prepared transactions for table t1",
+ setupFunc: func(pp *TxPreparedPool) {
+ pp.mu.Lock()
+ pp.open = true
+ pp.mu.Unlock()
+ pp.Put(&StatefulConnection{
+ txProps: &tx.Properties{
+ Queries: []tx.Query{
+ {
+ Tables: []string{"t1", "t2"},
+ },
+ },
+ },
+ }, "dtid1")
+ },
+ wantIsEmpty: false,
+ },
+ {
+ name: "Prepared transactions for other tables",
+ setupFunc: func(pp *TxPreparedPool) {
+ pp.mu.Lock()
+ pp.open = true
+ pp.mu.Unlock()
+ pp.Put(&StatefulConnection{
+ txProps: &tx.Properties{
+ Queries: []tx.Query{
+ {
+ Tables: []string{"t3", "t2"},
+ },
+ },
+ },
+ }, "dtid1")
+ },
+ wantIsEmpty: true,
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ pp := NewTxPreparedPool(1, true)
+ tt.setupFunc(pp)
+ assert.Equalf(t, tt.wantIsEmpty, pp.IsEmptyForTable("t1"), "IsEmptyForTable()")
+ })
+ }
+}
+
+func TestTxPreparedPoolIsEmpty(t *testing.T) {
+ tests := []struct {
+ name string
+ setupFunc func(pp *TxPreparedPool)
+ wantIsEmpty bool
+ }{
+ {
+ name: "Closed prepared pool",
+ setupFunc: func(pp *TxPreparedPool) {
+ pp.mu.Lock()
+ defer pp.mu.Unlock()
+ pp.open = false
+ },
+ wantIsEmpty: false,
+ },
+ {
+ name: "Two PC Disabled",
+ setupFunc: func(pp *TxPreparedPool) {
+ pp.mu.Lock()
+ defer pp.mu.Unlock()
+ pp.twoPCEnabled = false
+ },
+ wantIsEmpty: true,
+ },
+ {
+ name: "No prepared transactions",
+ setupFunc: func(pp *TxPreparedPool) {
+ pp.mu.Lock()
+ defer pp.mu.Unlock()
+ pp.open = true
+ },
+ wantIsEmpty: true,
+ },
+ {
+ name: "Prepared transactions exist",
+ setupFunc: func(pp *TxPreparedPool) {
+ pp.mu.Lock()
+ pp.open = true
+ pp.mu.Unlock()
+ pp.Put(&StatefulConnection{}, "dtid1")
+ },
+ wantIsEmpty: false,
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ pp := NewTxPreparedPool(1, true)
+ tt.setupFunc(pp)
+ assert.Equalf(t, tt.wantIsEmpty, pp.IsEmpty(), "IsEmpty()")
+ })
+ }
+}
diff --git a/go/vt/vttablet/tabletservermock/controller.go b/go/vt/vttablet/tabletservermock/controller.go
index bb0ce05ed19..9e97c033776 100644
--- a/go/vt/vttablet/tabletservermock/controller.go
+++ b/go/vt/vttablet/tabletservermock/controller.go
@@ -232,8 +232,9 @@ func (tqsc *Controller) GetThrottlerStatus(ctx context.Context) *throttle.Thrott
// RedoPreparedTransactions is part of the tabletserver.Controller interface
func (tqsc *Controller) RedoPreparedTransactions() {}
-// SetTwoPCAllowed sets whether TwoPC is allowed or not.
-func (tqsc *Controller) SetTwoPCAllowed(bool) {
+// SetTwoPCAllowed sets whether TwoPC is allowed or not. It also takes the reason of why it is being set.
+// The reason should be an enum value defined in the tabletserver.
+func (tqsc *Controller) SetTwoPCAllowed(int, bool) {
}
// UnresolvedTransactions is part of the tabletserver.Controller interface
@@ -260,6 +261,12 @@ func (tqsc *Controller) RollbackPrepared(context.Context, *querypb.Target, strin
return nil
}
+// WaitForPreparedTwoPCTransactions is part of the tabletserver.Controller interface
+func (tqsc *Controller) WaitForPreparedTwoPCTransactions(context.Context) error {
+ tqsc.MethodCalled["WaitForPreparedTwoPCTransactions"] = true
+ return nil
+}
+
// EnterLameduck implements tabletserver.Controller.
func (tqsc *Controller) EnterLameduck() {
tqsc.mu.Lock()
diff --git a/go/vt/wrangler/testlib/backup_test.go b/go/vt/wrangler/testlib/backup_test.go
index 0c7bc412f40..f1977df3f16 100644
--- a/go/vt/wrangler/testlib/backup_test.go
+++ b/go/vt/wrangler/testlib/backup_test.go
@@ -37,7 +37,6 @@ import (
"vitess.io/vitess/go/vt/mysqlctl"
"vitess.io/vitess/go/vt/mysqlctl/backupstorage"
"vitess.io/vitess/go/vt/mysqlctl/filebackupstorage"
- "vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/topo/memorytopo"
"vitess.io/vitess/go/vt/topo/topoproto"
"vitess.io/vitess/go/vt/vtenv"
@@ -688,9 +687,6 @@ func TestRestoreUnreachablePrimary(t *testing.T) {
"FAKE RESET REPLICA ALL",
"FAKE RESET BINARY LOGS AND GTIDS",
"FAKE SET GLOBAL gtid_purged",
- "STOP REPLICA",
- "FAKE SET SOURCE",
- "START REPLICA",
}
destTablet.FakeMysqlDaemon.FetchSuperQueryMap = map[string]*sqltypes.Result{
"SHOW DATABASES": {},
@@ -714,13 +710,16 @@ func TestRestoreUnreachablePrimary(t *testing.T) {
// stop primary so that it is unreachable
primary.StopActionLoop(t)
- // set a short timeout so that we don't have to wait 30 seconds
- topo.RemoteOperationTimeout = 2 * time.Second
- // Restore should still succeed
- require.NoError(t, destTablet.TM.RestoreData(ctx, logutil.NewConsoleLogger(), 0 /* waitForBackupInterval */, false /* deleteBeforeRestore */, time.Time{} /* restoreFromBackupTs */, time.Time{} /* restoreToTimestamp */, "" /* restoreToPos */, []string{} /* ignoreBackupEngines */, mysqlShutdownTimeout))
+ // Attempt to fix the test, but its still failing :man_shrugging.
+ ctx, cancel = context.WithTimeout(ctx, 2*time.Second)
+ defer cancel()
+ // Restore will return an error while trying to contact the primary for its position, but otherwise will succeed.
+ // The replication won't be running however, since we can't run errant GTID detection without the primary being online.
+ err = destTablet.TM.RestoreData(ctx, logutil.NewConsoleLogger(), 0 /* waitForBackupInterval */, false /* deleteBeforeRestore */, time.Time{} /* restoreFromBackupTs */, time.Time{} /* restoreToTimestamp */, "" /* restoreToPos */, []string{} /* ignoreBackupEngines */, mysqlShutdownTimeout)
+ require.ErrorContains(t, err, "DeadlineExceeded")
// verify the full status
require.NoError(t, destTablet.FakeMysqlDaemon.CheckSuperQueryList(), "destTablet.FakeMysqlDaemon.CheckSuperQueryList failed")
- assert.True(t, destTablet.FakeMysqlDaemon.Replicating)
+ assert.False(t, destTablet.FakeMysqlDaemon.Replicating)
assert.True(t, destTablet.FakeMysqlDaemon.Running)
}
diff --git a/go/vt/wrangler/testlib/reparent_utils_test.go b/go/vt/wrangler/testlib/reparent_utils_test.go
index e0a2077c778..ea2e34b66bd 100644
--- a/go/vt/wrangler/testlib/reparent_utils_test.go
+++ b/go/vt/wrangler/testlib/reparent_utils_test.go
@@ -205,6 +205,9 @@ func TestSetReplicationSource(t *testing.T) {
return nil
})
require.NoError(t, err, "UpdateShardFields failed")
+ pos, err := replication.DecodePosition("MySQL56/8bc65c84-3fe4-11ed-a912-257f0fcdd6c9:1-8")
+ require.NoError(t, err)
+ primary.FakeMysqlDaemon.CurrentPrimaryPositionLocked(pos)
// primary action loop (to initialize host and port)
primary.StartActionLoop(t, wr)
@@ -246,6 +249,36 @@ func TestSetReplicationSource(t *testing.T) {
checkSemiSyncEnabled(t, false, true, replica)
})
+ t.Run("Errant GTIDs on the replica", func(t *testing.T) {
+ replica := NewFakeTablet(t, wr, "cell1", 4, topodatapb.TabletType_REPLICA, nil)
+ // replica loop
+ replica.FakeMysqlDaemon.Replicating = true
+ replica.FakeMysqlDaemon.IOThreadRunning = true
+ replica.FakeMysqlDaemon.SetReplicationSourceInputs = append(replica.FakeMysqlDaemon.SetReplicationSourceInputs, topoproto.MysqlAddr(primary.Tablet))
+ replica.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{
+ // These 3 statements come from tablet startup
+ "STOP REPLICA",
+ "FAKE SET SOURCE",
+ "START REPLICA",
+ }
+ replica.StartActionLoop(t, wr)
+ defer replica.StopActionLoop(t)
+
+ // Set replica's GTID to have a write that the primary's GTID doesn't have
+ pos, err = replication.DecodePosition("MySQL56/8bc65c84-3fe4-11ed-a912-257f0fcdd6c9:1-7,8bc65cca-3fe4-11ed-bbfb-091034d48b3e:1")
+ require.NoError(t, err)
+ replica.FakeMysqlDaemon.CurrentRelayLogPosition = pos
+
+ // run SetReplicationSource
+ err = wr.SetReplicationSource(ctx, replica.Tablet)
+ require.ErrorContains(t, err, "Errant GTID detected")
+
+ // check what was run
+ err = replica.FakeMysqlDaemon.CheckSuperQueryList()
+ require.NoError(t, err, "CheckSuperQueryList failed")
+ checkSemiSyncEnabled(t, false, true, replica)
+ })
+
// test setting an empty hostname because of primary shutdown
t.Run("Primary tablet already shutdown", func(t *testing.T) {
replica := NewFakeTablet(t, wr, "cell1", 3, topodatapb.TabletType_REPLICA, nil)
diff --git a/go/vt/wrangler/workflow_test.go b/go/vt/wrangler/workflow_test.go
index 4f508766330..92996c9d931 100644
--- a/go/vt/wrangler/workflow_test.go
+++ b/go/vt/wrangler/workflow_test.go
@@ -325,6 +325,18 @@ func TestPartialMoveTables(t *testing.T) {
tme := newTestTablePartialMigrater(ctx, t, shards, shards[0:1], "select * %s")
defer tme.stopTablets(t)
+ // Add the schema for the primary tablets, so that we don't fail while applying the denied table rules.
+ schm := &tabletmanagerdatapb.SchemaDefinition{
+ TableDefinitions: []*tabletmanagerdatapb.TableDefinition{{
+ Name: "t1",
+ }, {
+ Name: "t2",
+ }},
+ }
+ for _, primary := range append(tme.sourcePrimaries, tme.targetPrimaries...) {
+ primary.FakeMysqlDaemon.Schema = schm
+ }
+
// Save some unrelated shard routing rules to be sure that
// they don't interfere in any way.
srr, err := tme.ts.GetShardRoutingRules(ctx)
@@ -400,6 +412,17 @@ func TestPartialMoveTablesShardSubset(t *testing.T) {
}
tme := newTestTablePartialMigrater(ctx, t, shards, shardsToMove, "select * %s")
defer tme.stopTablets(t)
+ // Add the schema for the primary tablets, so that we don't fail while applying the denied table rules.
+ schm := &tabletmanagerdatapb.SchemaDefinition{
+ TableDefinitions: []*tabletmanagerdatapb.TableDefinition{{
+ Name: "t1",
+ }, {
+ Name: "t2",
+ }},
+ }
+ for _, primary := range append(tme.sourcePrimaries, tme.targetPrimaries...) {
+ primary.FakeMysqlDaemon.Schema = schm
+ }
// Save some unrelated shard routing rules to be sure that
// they don't interfere in any way.
diff --git a/test/templates/cluster_vitess_tester.tpl b/test/templates/cluster_vitess_tester.tpl
index ae9191991fd..db492dad21b 100644
--- a/test/templates/cluster_vitess_tester.tpl
+++ b/test/templates/cluster_vitess_tester.tpl
@@ -110,7 +110,7 @@ jobs:
go install github.com/vitessio/go-junit-report@HEAD
# install vitess tester
- go install github.com/vitessio/vitess-tester@89dd933a9ea0e15f69ca58b9c8ea09a358762cca
+ go install github.com/vitessio/vitess-tester/go/vt@374fd9f495c1afd3b6bea9d4ec7728119714055
- name: Setup launchable dependencies
if: steps.skip-workflow.outputs.is_draft == 'false' && steps.skip-workflow.outputs.skip-workflow == 'false' && steps.changes.outputs.end_to_end == 'true' && github.base_ref == 'main'
@@ -142,9 +142,9 @@ jobs:
# We go over all the directories in the given path.
# If there is a vschema file there, we use it, otherwise we let vitess-tester autogenerate it.
if [ -f $dir/vschema.json ]; then
- vitess-tester --xunit --vschema "$dir"vschema.json $dir/*.test
+ vt tester --xunit --vschema "$dir"vschema.json $dir/*.test
else
- vitess-tester --sharded --xunit $dir/*.test
+ vt tester --sharded --xunit $dir/*.test
fi
# Number the reports by changing their file names.
mv report.xml report"$i".xml