From 3c2e8f94153f7e2ad697d52fdf42bab8673f69aa Mon Sep 17 00:00:00 2001 From: Manan Gupta <35839558+GuptaManan100@users.noreply.github.com> Date: Mon, 12 Aug 2024 13:00:47 +0530 Subject: [PATCH] Atomic Transaction bug fix with PRS disruption (#16576) Signed-off-by: Manan Gupta --- go/test/endtoend/cluster/cluster_process.go | 24 ++-- go/test/endtoend/cluster/vtctl_process.go | 16 +-- .../encrypted_replication_test.go | 2 +- .../encrypted_transport_test.go | 2 +- go/test/endtoend/mysqlctl/mysqlctl_test.go | 2 +- go/test/endtoend/mysqlctld/mysqlctld_test.go | 2 +- .../endtoend/sharded/sharded_keyspace_test.go | 2 +- .../transaction/twopc/fuzzer/main_test.go | 28 +--- .../endtoend/transaction/twopc/main_test.go | 22 ++- go/test/endtoend/transaction/twopc/schema.sql | 6 + .../endtoend/transaction/twopc/twopc_test.go | 126 ++++++++++++++++++ .../endtoend/transaction/twopc/utils/utils.go | 59 ++++++++ .../endtoend/transaction/twopc/vschema.json | 11 ++ go/vt/vterrors/code.go | 1 + go/vt/vttablet/tabletserver/debug_2pc.go | 48 +++++++ go/vt/vttablet/tabletserver/dt_executor.go | 2 +- .../vttablet/tabletserver/dt_executor_test.go | 2 +- go/vt/vttablet/tabletserver/production.go | 30 +++++ go/vt/vttablet/tabletserver/tabletserver.go | 3 + go/vt/vttablet/tabletserver/tx_engine.go | 4 +- go/vt/vttablet/tabletserver/tx_prep_pool.go | 30 +++-- .../tabletserver/tx_prep_pool_test.go | 33 ++--- 22 files changed, 359 insertions(+), 96 deletions(-) create mode 100644 go/test/endtoend/transaction/twopc/utils/utils.go create mode 100644 go/vt/vttablet/tabletserver/debug_2pc.go create mode 100644 go/vt/vttablet/tabletserver/production.go diff --git a/go/test/endtoend/cluster/cluster_process.go b/go/test/endtoend/cluster/cluster_process.go index 44636b3cdb6..95995903a83 100644 --- a/go/test/endtoend/cluster/cluster_process.go +++ b/go/test/endtoend/cluster/cluster_process.go @@ -151,11 +151,12 @@ type Vttablet struct { // Keyspace : Cluster accepts keyspace to launch it type Keyspace struct { - Name string - SchemaSQL string - VSchema string - SidecarDBName string - Shards []Shard + Name string + SchemaSQL string + VSchema string + SidecarDBName string + DurabilityPolicy string + Shards []Shard } // Shard with associated vttablets @@ -284,9 +285,10 @@ func (cluster *LocalProcessCluster) startPartialKeyspace(keyspace Keyspace, shar cluster.HasPartialKeyspaces = true routedKeyspace := &Keyspace{ - Name: fmt.Sprintf("%s_routed", keyspace.Name), - SchemaSQL: keyspace.SchemaSQL, - VSchema: keyspace.VSchema, + Name: fmt.Sprintf("%s_routed", keyspace.Name), + SchemaSQL: keyspace.SchemaSQL, + VSchema: keyspace.VSchema, + DurabilityPolicy: keyspace.DurabilityPolicy, } err = cluster.startKeyspace(*routedKeyspace, shardNames, replicaCount, rdonly, customizers...) @@ -374,7 +376,7 @@ func (cluster *LocalProcessCluster) startKeyspace(keyspace Keyspace, shardNames keyspace.SidecarDBName = sidecar.DefaultName } // Create the keyspace if it doesn't already exist. - _ = cluster.VtctlProcess.CreateKeyspace(keyspace.Name, keyspace.SidecarDBName) + _ = cluster.VtctlProcess.CreateKeyspace(keyspace.Name, keyspace.SidecarDBName, keyspace.DurabilityPolicy) for _, shardName := range shardNames { shard := &Shard{ Name: shardName, @@ -538,7 +540,7 @@ func (cluster *LocalProcessCluster) StartKeyspaceLegacy(keyspace Keyspace, shard keyspace.SidecarDBName = sidecar.DefaultName } // Create the keyspace if it doesn't already exist. - _ = cluster.VtctlProcess.CreateKeyspace(keyspace.Name, keyspace.SidecarDBName) + _ = cluster.VtctlProcess.CreateKeyspace(keyspace.Name, keyspace.SidecarDBName, keyspace.DurabilityPolicy) var mysqlctlProcessList []*exec.Cmd for _, shardName := range shardNames { shard := &Shard{ @@ -681,7 +683,7 @@ func (cluster *LocalProcessCluster) SetupCluster(keyspace *Keyspace, shards []Sh if !cluster.ReusingVTDATAROOT { // Create Keyspace - err = cluster.VtctlProcess.CreateKeyspace(keyspace.Name, keyspace.SidecarDBName) + err = cluster.VtctlProcess.CreateKeyspace(keyspace.Name, keyspace.SidecarDBName, keyspace.DurabilityPolicy) if err != nil { log.Error(err) return diff --git a/go/test/endtoend/cluster/vtctl_process.go b/go/test/endtoend/cluster/vtctl_process.go index b9d8a5b46ce..185c3079d34 100644 --- a/go/test/endtoend/cluster/vtctl_process.go +++ b/go/test/endtoend/cluster/vtctl_process.go @@ -60,15 +60,15 @@ func (vtctl *VtctlProcess) AddCellInfo(Cell string) (err error) { } // CreateKeyspace executes vtctl command to create keyspace -func (vtctl *VtctlProcess) CreateKeyspace(keyspace, sidecarDBName string) (err error) { - var output string - // For upgrade/downgrade tests where an older version is also used. - if vtctl.VtctlMajorVersion < 17 { - log.Errorf("CreateKeyspace does not support the --sidecar-db-name flag in vtctl version %d; ignoring...", vtctl.VtctlMajorVersion) - output, err = vtctl.ExecuteCommandWithOutput("CreateKeyspace", keyspace) - } else { - output, err = vtctl.ExecuteCommandWithOutput("CreateKeyspace", keyspace, "--sidecar-db-name", sidecarDBName) +func (vtctl *VtctlProcess) CreateKeyspace(keyspace, sidecarDBName, durabilityPolicy string) error { + args := []string{ + "CreateKeyspace", keyspace, + "--sidecar-db-name", sidecarDBName, } + if durabilityPolicy != "" { + args = append(args, "--durability-policy", durabilityPolicy) + } + output, err := vtctl.ExecuteCommandWithOutput(args...) if err != nil { log.Errorf("CreateKeyspace returned err: %s, output: %s", err, output) } diff --git a/go/test/endtoend/encryption/encryptedreplication/encrypted_replication_test.go b/go/test/endtoend/encryption/encryptedreplication/encrypted_replication_test.go index 4c759ff577a..7dea6cf525f 100644 --- a/go/test/endtoend/encryption/encryptedreplication/encrypted_replication_test.go +++ b/go/test/endtoend/encryption/encryptedreplication/encrypted_replication_test.go @@ -131,7 +131,7 @@ func initializeCluster(t *testing.T) (int, error) { for _, keyspaceStr := range []string{keyspace} { KeyspacePtr := &cluster.Keyspace{Name: keyspaceStr} keyspace := *KeyspacePtr - if err := clusterInstance.VtctlProcess.CreateKeyspace(keyspace.Name, sidecar.DefaultName); err != nil { + if err := clusterInstance.VtctlProcess.CreateKeyspace(keyspace.Name, sidecar.DefaultName, ""); err != nil { return 1, err } shard := &cluster.Shard{ diff --git a/go/test/endtoend/encryption/encryptedtransport/encrypted_transport_test.go b/go/test/endtoend/encryption/encryptedtransport/encrypted_transport_test.go index 9147b7b9080..1363e07b2cd 100644 --- a/go/test/endtoend/encryption/encryptedtransport/encrypted_transport_test.go +++ b/go/test/endtoend/encryption/encryptedtransport/encrypted_transport_test.go @@ -350,7 +350,7 @@ func clusterSetUp(t *testing.T) (int, error) { for _, keyspaceStr := range []string{keyspace} { KeyspacePtr := &cluster.Keyspace{Name: keyspaceStr} keyspace := *KeyspacePtr - if err := clusterInstance.VtctlProcess.CreateKeyspace(keyspace.Name, sidecar.DefaultName); err != nil { + if err := clusterInstance.VtctlProcess.CreateKeyspace(keyspace.Name, sidecar.DefaultName, ""); err != nil { return 1, err } shard := &cluster.Shard{ diff --git a/go/test/endtoend/mysqlctl/mysqlctl_test.go b/go/test/endtoend/mysqlctl/mysqlctl_test.go index 6c3d65226e3..f93724fa4a8 100644 --- a/go/test/endtoend/mysqlctl/mysqlctl_test.go +++ b/go/test/endtoend/mysqlctl/mysqlctl_test.go @@ -53,7 +53,7 @@ func TestMain(m *testing.M) { return 1 } - if err := clusterInstance.VtctlProcess.CreateKeyspace(keyspaceName, sidecar.DefaultName); err != nil { + if err := clusterInstance.VtctlProcess.CreateKeyspace(keyspaceName, sidecar.DefaultName, ""); err != nil { return 1 } diff --git a/go/test/endtoend/mysqlctld/mysqlctld_test.go b/go/test/endtoend/mysqlctld/mysqlctld_test.go index 328bc563377..beb155830e2 100644 --- a/go/test/endtoend/mysqlctld/mysqlctld_test.go +++ b/go/test/endtoend/mysqlctld/mysqlctld_test.go @@ -57,7 +57,7 @@ func TestMain(m *testing.M) { return 1 } - if err := clusterInstance.VtctlProcess.CreateKeyspace(keyspaceName, sidecar.DefaultName); err != nil { + if err := clusterInstance.VtctlProcess.CreateKeyspace(keyspaceName, sidecar.DefaultName, ""); err != nil { return 1 } diff --git a/go/test/endtoend/sharded/sharded_keyspace_test.go b/go/test/endtoend/sharded/sharded_keyspace_test.go index f311404ad7e..192355fa6ef 100644 --- a/go/test/endtoend/sharded/sharded_keyspace_test.go +++ b/go/test/endtoend/sharded/sharded_keyspace_test.go @@ -84,7 +84,7 @@ func TestMain(m *testing.M) { if err := clusterInstance.StartTopo(); err != nil { return 1, err } - if err := clusterInstance.VtctlProcess.CreateKeyspace(keyspaceName, sidecar.DefaultName); err != nil { + if err := clusterInstance.VtctlProcess.CreateKeyspace(keyspaceName, sidecar.DefaultName, ""); err != nil { return 1, err } diff --git a/go/test/endtoend/transaction/twopc/fuzzer/main_test.go b/go/test/endtoend/transaction/twopc/fuzzer/main_test.go index 5e1d14d77e4..e0affde186a 100644 --- a/go/test/endtoend/transaction/twopc/fuzzer/main_test.go +++ b/go/test/endtoend/transaction/twopc/fuzzer/main_test.go @@ -28,6 +28,7 @@ import ( "vitess.io/vitess/go/mysql" "vitess.io/vitess/go/test/endtoend/cluster" + "vitess.io/vitess/go/test/endtoend/transaction/twopc/utils" ) var ( @@ -110,29 +111,6 @@ func start(t *testing.T) (*mysql.Conn, func()) { func cleanup(t *testing.T) { cluster.PanicHandler(t) - ctx := context.Background() - conn, err := mysql.Connect(ctx, &vtParams) - require.NoError(t, err) - defer conn.Close() - - clearOutTable(t, conn, "twopc_fuzzer_insert") - clearOutTable(t, conn, "twopc_fuzzer_update") -} - -// clearOutTable deletes everything from a table. Sometimes the table might have more rows than allowed in a single delete query, -// so we have to do the deletions iteratively. -func clearOutTable(t *testing.T, conn *mysql.Conn, tableName string) { - for { - res, err := conn.ExecuteFetch(fmt.Sprintf("SELECT count(*) FROM %v", tableName), 1, false) - require.NoError(t, err) - require.Len(t, res.Rows, 1) - require.Len(t, res.Rows[0], 1) - rowCount, err := res.Rows[0][0].ToInt() - require.NoError(t, err) - if rowCount == 0 { - return - } - _, err = conn.ExecuteFetch(fmt.Sprintf("DELETE FROM %v LIMIT 10000", tableName), 10000, false) - require.NoError(t, err) - } + utils.ClearOutTable(t, vtParams, "twopc_fuzzer_insert") + utils.ClearOutTable(t, vtParams, "twopc_fuzzer_update") } diff --git a/go/test/endtoend/transaction/twopc/main_test.go b/go/test/endtoend/transaction/twopc/main_test.go index 8ac7cfc1f21..4c5e2715563 100644 --- a/go/test/endtoend/transaction/twopc/main_test.go +++ b/go/test/endtoend/transaction/twopc/main_test.go @@ -33,7 +33,7 @@ import ( "vitess.io/vitess/go/mysql" "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/test/endtoend/cluster" - "vitess.io/vitess/go/test/endtoend/utils" + "vitess.io/vitess/go/test/endtoend/transaction/twopc/utils" binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" querypb "vitess.io/vitess/go/vt/proto/query" topodatapb "vitess.io/vitess/go/vt/proto/topodata" @@ -85,12 +85,13 @@ func TestMain(m *testing.M) { // Start keyspace keyspace := &cluster.Keyspace{ - Name: keyspaceName, - SchemaSQL: SchemaSQL, - VSchema: VSchema, - SidecarDBName: sidecarDBName, + Name: keyspaceName, + SchemaSQL: SchemaSQL, + VSchema: VSchema, + SidecarDBName: sidecarDBName, + DurabilityPolicy: "semi_sync", } - if err := clusterInstance.StartKeyspace(*keyspace, []string{"-40", "40-80", "80-"}, 0, false); err != nil { + if err := clusterInstance.StartKeyspace(*keyspace, []string{"-40", "40-80", "80-"}, 2, false); err != nil { return 1 } @@ -119,13 +120,8 @@ func start(t *testing.T) (*mysql.Conn, func()) { func cleanup(t *testing.T) { cluster.PanicHandler(t) - - ctx := context.Background() - conn, err := mysql.Connect(ctx, &vtParams) - require.NoError(t, err) - defer conn.Close() - - _, _ = utils.ExecAllowError(t, conn, "delete from twopc_user") + utils.ClearOutTable(t, vtParams, "twopc_user") + utils.ClearOutTable(t, vtParams, "twopc_t1") } type extractInterestingValues func(dtidMap map[string]string, vals []sqltypes.Value) []sqltypes.Value diff --git a/go/test/endtoend/transaction/twopc/schema.sql b/go/test/endtoend/transaction/twopc/schema.sql index 60a7c19837c..de9e3ef0656 100644 --- a/go/test/endtoend/transaction/twopc/schema.sql +++ b/go/test/endtoend/transaction/twopc/schema.sql @@ -9,4 +9,10 @@ create table twopc_music ( user_id bigint, title varchar(64), primary key (id) +) Engine=InnoDB; + +create table twopc_t1 ( + id bigint, + col bigint, + primary key (id, col) ) Engine=InnoDB; \ No newline at end of file diff --git a/go/test/endtoend/transaction/twopc/twopc_test.go b/go/test/endtoend/transaction/twopc/twopc_test.go index 98bc158c4da..53c1780f373 100644 --- a/go/test/endtoend/transaction/twopc/twopc_test.go +++ b/go/test/endtoend/transaction/twopc/twopc_test.go @@ -20,6 +20,8 @@ import ( "context" _ "embed" "fmt" + "os" + "path" "reflect" "sort" "strings" @@ -35,11 +37,17 @@ import ( "vitess.io/vitess/go/test/endtoend/cluster" "vitess.io/vitess/go/test/endtoend/utils" "vitess.io/vitess/go/vt/callerid" + "vitess.io/vitess/go/vt/log" binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" querypb "vitess.io/vitess/go/vt/proto/query" "vitess.io/vitess/go/vt/vtgate/vtgateconn" ) +const ( + DebugDelayCommitShard = "VT_DELAY_COMMIT_SHARD" + DebugDelayCommitTime = "VT_DELAY_COMMIT_TIME" +) + // TestDTCommit tests distributed transaction commit for insert, update and delete operations // It verifies the binlog events for the same with transaction state changes and redo statements. func TestDTCommit(t *testing.T) { @@ -955,3 +963,121 @@ func testWarningAndTransactionStatus(t *testing.T, conn *vtgateconn.VTGateSessio assert.Equal(t, txParticipants, tx.participants) } } + +// TestDisruptions tests that atomic transactions persevere through various disruptions. +func TestDisruptions(t *testing.T) { + testcases := []struct { + disruptionName string + commitDelayTime string + disruption func() error + }{ + { + disruptionName: "No Disruption", + commitDelayTime: "1", + disruption: func() error { + return nil + }, + }, + { + disruptionName: "PlannedReparentShard", + commitDelayTime: "5", + disruption: prsShard3, + }, + } + for _, tt := range testcases { + t.Run(fmt.Sprintf("%s-%ss timeout", tt.disruptionName, tt.commitDelayTime), func(t *testing.T) { + // Reparent all the shards to first tablet being the primary. + reparentToFistTablet(t) + // cleanup all the old data. + conn, closer := start(t) + defer closer() + // Start an atomic transaction. + utils.Exec(t, conn, "begin") + // Insert rows such that they go to all the three shards. Given that we have sharded the table `twopc_t1` on reverse_bits + // it is very easy to figure out what value will end up in which shard. + utils.Exec(t, conn, "insert into twopc_t1(id, col) values(4, 4)") + utils.Exec(t, conn, "insert into twopc_t1(id, col) values(6, 4)") + utils.Exec(t, conn, "insert into twopc_t1(id, col) values(9, 4)") + // We want to delay the commit on one of the shards to simulate slow commits on a shard. + writeTestCommunicationFile(t, DebugDelayCommitShard, "80-") + defer deleteFile(DebugDelayCommitShard) + writeTestCommunicationFile(t, DebugDelayCommitTime, tt.commitDelayTime) + defer deleteFile(DebugDelayCommitTime) + // We will execute a commit in a go routine, because we know it will take some time to complete. + // While the commit is ongoing, we would like to run the disruption. + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + _, err := utils.ExecAllowError(t, conn, "commit") + if err != nil { + log.Errorf("Error in commit - %v", err) + } + }() + // Allow enough time for the commit to have started. + time.Sleep(1 * time.Second) + // Run the disruption. + err := tt.disruption() + require.NoError(t, err) + // Wait for the commit to have returned. We don't actually check for an error in the commit because the user might receive an error. + // But since we are waiting in CommitPrepared, the decision to commit the transaction should have already been taken. + wg.Wait() + // Check the data in the table. + waitForResults(t, "select id, col from twopc_t1 where col = 4 order by id", `[[INT64(4) INT64(4)] [INT64(6) INT64(4)] [INT64(9) INT64(4)]]`, 10*time.Second) + }) + } +} + +// reparentToFistTablet reparents all the shards to first tablet being the primary. +func reparentToFistTablet(t *testing.T) { + ks := clusterInstance.Keyspaces[0] + for _, shard := range ks.Shards { + primary := shard.Vttablets[0] + err := clusterInstance.VtctldClientProcess.PlannedReparentShard(keyspaceName, shard.Name, primary.Alias) + require.NoError(t, err) + } +} + +// writeTestCommunicationFile writes the content to the file with the given name. +// We use these files to coordinate with the vttablets running in the debug mode. +func writeTestCommunicationFile(t *testing.T, fileName string, content string) { + err := os.WriteFile(path.Join(os.Getenv("VTDATAROOT"), fileName), []byte(content), 0644) + require.NoError(t, err) +} + +// deleteFile deletes the file specified. +func deleteFile(fileName string) { + _ = os.Remove(path.Join(os.Getenv("VTDATAROOT"), fileName)) +} + +// waitForResults waits for the results of the query to be as expected. +func waitForResults(t *testing.T, query string, resultExpected string, waitTime time.Duration) { + timeout := time.After(waitTime) + for { + select { + case <-timeout: + t.Fatalf("didn't reach expected results for %s", query) + default: + ctx := context.Background() + conn, err := mysql.Connect(ctx, &vtParams) + require.NoError(t, err) + res := utils.Exec(t, conn, query) + conn.Close() + if fmt.Sprintf("%v", res.Rows) == resultExpected { + return + } + time.Sleep(100 * time.Millisecond) + } + } +} + +/* +Cluster Level Disruptions for the fuzzer +*/ + +// prsShard3 runs a PRS in shard 3 of the keyspace. It promotes the second tablet to be the new primary. +func prsShard3() error { + shard := clusterInstance.Keyspaces[0].Shards[2] + newPrimary := shard.Vttablets[1] + return clusterInstance.VtctldClientProcess.PlannedReparentShard(keyspaceName, shard.Name, newPrimary.Alias) +} diff --git a/go/test/endtoend/transaction/twopc/utils/utils.go b/go/test/endtoend/transaction/twopc/utils/utils.go new file mode 100644 index 00000000000..7311375ee55 --- /dev/null +++ b/go/test/endtoend/transaction/twopc/utils/utils.go @@ -0,0 +1,59 @@ +/* +Copyright 2024 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package utils + +import ( + "context" + "fmt" + "testing" + + "github.com/stretchr/testify/require" + + "vitess.io/vitess/go/mysql" + "vitess.io/vitess/go/vt/log" +) + +// ClearOutTable deletes everything from a table. Sometimes the table might have more rows than allowed in a single delete query, +// so we have to do the deletions iteratively. +func ClearOutTable(t *testing.T, vtParams mysql.ConnParams, tableName string) { + ctx := context.Background() + for { + conn, err := mysql.Connect(ctx, &vtParams) + require.NoError(t, err) + + res, err := conn.ExecuteFetch(fmt.Sprintf("SELECT count(*) FROM %v", tableName), 1, false) + if err != nil { + log.Errorf("Error in selecting - %v", err) + conn.Close() + continue + } + require.Len(t, res.Rows, 1) + require.Len(t, res.Rows[0], 1) + rowCount, err := res.Rows[0][0].ToInt() + require.NoError(t, err) + if rowCount == 0 { + conn.Close() + return + } + _, err = conn.ExecuteFetch(fmt.Sprintf("DELETE FROM %v LIMIT 10000", tableName), 10000, false) + if err != nil { + log.Errorf("Error in cleanup deletion - %v", err) + conn.Close() + continue + } + } +} diff --git a/go/test/endtoend/transaction/twopc/vschema.json b/go/test/endtoend/transaction/twopc/vschema.json index 4ff62df6808..bca58b05c1e 100644 --- a/go/test/endtoend/transaction/twopc/vschema.json +++ b/go/test/endtoend/transaction/twopc/vschema.json @@ -3,6 +3,9 @@ "vindexes": { "xxhash": { "type": "xxhash" + }, + "reverse_bits": { + "type": "reverse_bits" } }, "tables": { @@ -21,6 +24,14 @@ "name": "xxhash" } ] + }, + "twopc_t1": { + "column_vindexes": [ + { + "column": "id", + "name": "reverse_bits" + } + ] } } } \ No newline at end of file diff --git a/go/vt/vterrors/code.go b/go/vt/vterrors/code.go index 857ba538ebe..83a87503265 100644 --- a/go/vt/vterrors/code.go +++ b/go/vt/vterrors/code.go @@ -96,6 +96,7 @@ var ( VT09022 = errorWithoutState("VT09022", vtrpcpb.Code_FAILED_PRECONDITION, "Destination does not have exactly one shard: %v", "Cannot send query to multiple shards.") VT09023 = errorWithoutState("VT09023", vtrpcpb.Code_FAILED_PRECONDITION, "could not map %v to a keyspace id", "Unable to determine the shard for the given row.") VT09024 = errorWithoutState("VT09024", vtrpcpb.Code_FAILED_PRECONDITION, "could not map %v to a unique keyspace id: %v", "Unable to determine the shard for the given row.") + VT09025 = errorWithoutState("VT09025", vtrpcpb.Code_FAILED_PRECONDITION, "atomic transaction error: %v", "Error in atomic transactions") VT10001 = errorWithoutState("VT10001", vtrpcpb.Code_ABORTED, "foreign key constraints are not allowed", "Foreign key constraints are not allowed, see https://vitess.io/blog/2021-06-15-online-ddl-why-no-fk/.") VT10002 = errorWithoutState("VT10002", vtrpcpb.Code_ABORTED, "atomic distributed transaction not allowed: %s", "The distributed transaction cannot be committed. A rollback decision is taken.") diff --git a/go/vt/vttablet/tabletserver/debug_2pc.go b/go/vt/vttablet/tabletserver/debug_2pc.go new file mode 100644 index 00000000000..a0de20104db --- /dev/null +++ b/go/vt/vttablet/tabletserver/debug_2pc.go @@ -0,0 +1,48 @@ +//go:build debug2PC + +/* +Copyright 2024 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package tabletserver + +import ( + "os" + "path" + "strconv" + "time" + + "vitess.io/vitess/go/vt/log" +) + +const DebugTwoPc = true + +// readFileForTestSynchronization is a test-only function that reads a file +// that we use for synchronizing some of the tests. +func readFileForTestSynchronization(fileName string) string { + res, _ := os.ReadFile(path.Join(os.Getenv("VTDATAROOT"), fileName)) + return string(res) +} + +// commitPreparedDelayForTest is a test-only function that delays the commit that have already been prepared. +func commitPreparedDelayForTest(tsv *TabletServer) { + sh := readFileForTestSynchronization("VT_DELAY_COMMIT_SHARD") + if tsv.sm.target.Shard == sh { + delay := readFileForTestSynchronization("VT_DELAY_COMMIT_TIME") + delVal, _ := strconv.Atoi(delay) + log.Infof("Delaying commit for shard %v for %d seconds", sh, delVal) + time.Sleep(time.Duration(delVal) * time.Second) + } +} diff --git a/go/vt/vttablet/tabletserver/dt_executor.go b/go/vt/vttablet/tabletserver/dt_executor.go index 5f4e7644766..9ddca3247a3 100644 --- a/go/vt/vttablet/tabletserver/dt_executor.go +++ b/go/vt/vttablet/tabletserver/dt_executor.go @@ -96,7 +96,7 @@ func (dte *DTExecutor) CommitPrepared(dtid string) (err error) { var conn *StatefulConnection conn, err = dte.te.preparedPool.FetchForCommit(dtid) if err != nil { - return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "cannot commit dtid %s, state: %v", dtid, err) + return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "cannot commit dtid %s, err: %v", dtid, err) } // No connection means the transaction was already committed. if conn == nil { diff --git a/go/vt/vttablet/tabletserver/dt_executor_test.go b/go/vt/vttablet/tabletserver/dt_executor_test.go index 045496eb4b8..fb45ab454fc 100644 --- a/go/vt/vttablet/tabletserver/dt_executor_test.go +++ b/go/vt/vttablet/tabletserver/dt_executor_test.go @@ -228,7 +228,7 @@ func TestTxExecutorCommitRedoFail(t *testing.T) { // A retry should fail differently as the prepared transaction is marked as failed. err = txe.CommitPrepared("bb") require.Error(t, err) - require.Contains(t, err.Error(), "cannot commit dtid bb, state: failed") + require.Contains(t, err.Error(), "cannot commit dtid bb, err: VT09025: atomic transaction error: failed to commit") require.Contains(t, strings.Join(tl.GetAllLogs(), "|"), "failed to commit the prepared transaction 'bb' with error: unknown error: delete redo log fail") diff --git a/go/vt/vttablet/tabletserver/production.go b/go/vt/vttablet/tabletserver/production.go new file mode 100644 index 00000000000..70cb8b092fa --- /dev/null +++ b/go/vt/vttablet/tabletserver/production.go @@ -0,0 +1,30 @@ +//go:build !debug2PC + +/* +Copyright 2024 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package tabletserver + +// This file defines debug constants that are always false. +// This file is used for building production code. +// We use go build directives to include a file that defines the constant to true +// when certain tags are provided while building binaries. +// This allows to have debugging code written in normal code flow without affecting +// production performance. + +const DebugTwoPc = false + +func commitPreparedDelayForTest(tsv *TabletServer) {} diff --git a/go/vt/vttablet/tabletserver/tabletserver.go b/go/vt/vttablet/tabletserver/tabletserver.go index 167d55a4e6f..e3e951892b7 100644 --- a/go/vt/vttablet/tabletserver/tabletserver.go +++ b/go/vt/vttablet/tabletserver/tabletserver.go @@ -660,6 +660,9 @@ func (tsv *TabletServer) CommitPrepared(ctx context.Context, target *querypb.Tar target, nil, true, /* allowOnShutdown */ func(ctx context.Context, logStats *tabletenv.LogStats) error { txe := NewDTExecutor(ctx, tsv.te, logStats) + if DebugTwoPc { + commitPreparedDelayForTest(tsv) + } return txe.CommitPrepared(dtid) }, ) diff --git a/go/vt/vttablet/tabletserver/tx_engine.go b/go/vt/vttablet/tabletserver/tx_engine.go index 57c6ff1fd64..33e22e321bc 100644 --- a/go/vt/vttablet/tabletserver/tx_engine.go +++ b/go/vt/vttablet/tabletserver/tx_engine.go @@ -160,6 +160,8 @@ func (te *TxEngine) transition(state txEngineState) { te.txPool.Open(te.env.Config().DB.AppWithDB(), te.env.Config().DB.DbaWithDB(), te.env.Config().DB.AppDebugWithDB()) if te.twopcEnabled && te.state == AcceptingReadAndWrite { + // Set the preparedPool to start accepting connections. + te.preparedPool.shutdown = false // If there are errors, we choose to raise an alert and // continue anyway. Serving traffic is considered more important // than blocking everything for the sake of a few transactions. @@ -442,7 +444,7 @@ func (te *TxEngine) shutdownTransactions() { func (te *TxEngine) rollbackPrepared() { ctx := tabletenv.LocalContext() - for _, conn := range te.preparedPool.FetchAll() { + for _, conn := range te.preparedPool.FetchAllForRollback() { te.txPool.Rollback(ctx, conn) conn.Release(tx.TxRollback) } diff --git a/go/vt/vttablet/tabletserver/tx_prep_pool.go b/go/vt/vttablet/tabletserver/tx_prep_pool.go index 22e0ce295c0..d5376172856 100644 --- a/go/vt/vttablet/tabletserver/tx_prep_pool.go +++ b/go/vt/vttablet/tabletserver/tx_prep_pool.go @@ -17,14 +17,16 @@ limitations under the License. package tabletserver import ( - "errors" "fmt" "sync" + + vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" + "vitess.io/vitess/go/vt/vterrors" ) var ( - errPrepCommitting = errors.New("locked for committing") - errPrepFailed = errors.New("failed to commit") + errPrepCommitting = vterrors.VT09025("locked for committing") + errPrepFailed = vterrors.VT09025("failed to commit") ) // TxPreparedPool manages connections for prepared transactions. @@ -34,6 +36,8 @@ type TxPreparedPool struct { mu sync.Mutex conns map[string]*StatefulConnection reserved map[string]error + // shutdown tells if the prepared pool has been drained and shutdown. + shutdown bool capacity int } @@ -55,14 +59,18 @@ func NewTxPreparedPool(capacity int) *TxPreparedPool { func (pp *TxPreparedPool) Put(c *StatefulConnection, dtid string) error { pp.mu.Lock() defer pp.mu.Unlock() + // If the pool is shutdown, we don't accept new prepared transactions. + if pp.shutdown { + return vterrors.VT09025("pool is shutdown") + } if _, ok := pp.reserved[dtid]; ok { - return errors.New("duplicate DTID in Prepare: " + dtid) + return vterrors.VT09025("duplicate DTID in Prepare: " + dtid) } if _, ok := pp.conns[dtid]; ok { - return errors.New("duplicate DTID in Prepare: " + dtid) + return vterrors.VT09025("duplicate DTID in Prepare: " + dtid) } if len(pp.conns) >= pp.capacity { - return fmt.Errorf("prepared transactions exceeded limit: %d", pp.capacity) + return vterrors.New(vtrpcpb.Code_RESOURCE_EXHAUSTED, fmt.Sprintf("prepared transactions exceeded limit: %d", pp.capacity)) } pp.conns[dtid] = c return nil @@ -95,6 +103,11 @@ func (pp *TxPreparedPool) FetchForRollback(dtid string) *StatefulConnection { func (pp *TxPreparedPool) FetchForCommit(dtid string) (*StatefulConnection, error) { pp.mu.Lock() defer pp.mu.Unlock() + // If the pool is shutdown, we don't have any connections to return. + // That however doesn't mean this transaction was committed, it could very well have been rollbacked. + if pp.shutdown { + return nil, vterrors.VT09025("pool is shutdown") + } if err, ok := pp.reserved[dtid]; ok { return nil, err } @@ -121,11 +134,12 @@ func (pp *TxPreparedPool) Forget(dtid string) { delete(pp.reserved, dtid) } -// FetchAll removes all connections and returns them as a list. +// FetchAllForRollback removes all connections and returns them as a list. // It also forgets all reserved dtids. -func (pp *TxPreparedPool) FetchAll() []*StatefulConnection { +func (pp *TxPreparedPool) FetchAllForRollback() []*StatefulConnection { pp.mu.Lock() defer pp.mu.Unlock() + pp.shutdown = true conns := make([]*StatefulConnection, 0, len(pp.conns)) for _, c := range pp.conns { conns = append(conns, c) diff --git a/go/vt/vttablet/tabletserver/tx_prep_pool_test.go b/go/vt/vttablet/tabletserver/tx_prep_pool_test.go index cd2b5a180c1..42e2b800e0e 100644 --- a/go/vt/vttablet/tabletserver/tx_prep_pool_test.go +++ b/go/vt/vttablet/tabletserver/tx_prep_pool_test.go @@ -25,11 +25,8 @@ import ( func TestEmptyPrep(t *testing.T) { pp := NewTxPreparedPool(0) - want := "prepared transactions exceeded limit: 0" err := pp.Put(nil, "aa") - if err == nil || err.Error() != want { - t.Errorf("Put err: %v, want %s", err, want) - } + require.ErrorContains(t, err, "prepared transactions exceeded limit: 0") } func TestPrepPut(t *testing.T) { @@ -38,23 +35,15 @@ func TestPrepPut(t *testing.T) { require.NoError(t, err) err = pp.Put(nil, "bb") require.NoError(t, err) - want := "prepared transactions exceeded limit: 2" err = pp.Put(nil, "cc") - if err == nil || err.Error() != want { - t.Errorf("Put err: %v, want %s", err, want) - } + require.ErrorContains(t, err, "prepared transactions exceeded limit: 2") err = pp.Put(nil, "aa") - want = "duplicate DTID in Prepare: aa" - if err == nil || err.Error() != want { - t.Errorf("Put err: %v, want %s", err, want) - } + require.ErrorContains(t, err, "duplicate DTID in Prepare: aa") + _, err = pp.FetchForCommit("aa") require.NoError(t, err) err = pp.Put(nil, "aa") - want = "duplicate DTID in Prepare: aa" - if err == nil || err.Error() != want { - t.Errorf("Put err: %v, want %s", err, want) - } + require.ErrorContains(t, err, "duplicate DTID in Prepare: aa") pp.Forget("aa") err = pp.Put(nil, "aa") require.NoError(t, err) @@ -113,11 +102,9 @@ func TestPrepFetchAll(t *testing.T) { conn2 := &StatefulConnection{} pp.Put(conn1, "aa") pp.Put(conn2, "bb") - got := pp.FetchAll() - if len(got) != 2 { - t.Errorf("FetchAll len: %d, want 2", len(got)) - } - if len(pp.conns) != 0 { - t.Errorf("len(pp.conns): %d, want 0", len(pp.conns)) - } + got := pp.FetchAllForRollback() + require.Len(t, got, 2) + require.Len(t, pp.conns, 0) + _, err := pp.FetchForCommit("aa") + require.ErrorContains(t, err, "pool is shutdown") }