Skip to content

Commit

Permalink
Atomic Transaction bug fix with PRS disruption (vitessio#16576)
Browse files Browse the repository at this point in the history
Signed-off-by: Manan Gupta <[email protected]>
  • Loading branch information
GuptaManan100 authored Aug 12, 2024
1 parent 9018fef commit 3c2e8f9
Show file tree
Hide file tree
Showing 22 changed files with 359 additions and 96 deletions.
24 changes: 13 additions & 11 deletions go/test/endtoend/cluster/cluster_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,11 +151,12 @@ type Vttablet struct {

// Keyspace : Cluster accepts keyspace to launch it
type Keyspace struct {
Name string
SchemaSQL string
VSchema string
SidecarDBName string
Shards []Shard
Name string
SchemaSQL string
VSchema string
SidecarDBName string
DurabilityPolicy string
Shards []Shard
}

// Shard with associated vttablets
Expand Down Expand Up @@ -284,9 +285,10 @@ func (cluster *LocalProcessCluster) startPartialKeyspace(keyspace Keyspace, shar

cluster.HasPartialKeyspaces = true
routedKeyspace := &Keyspace{
Name: fmt.Sprintf("%s_routed", keyspace.Name),
SchemaSQL: keyspace.SchemaSQL,
VSchema: keyspace.VSchema,
Name: fmt.Sprintf("%s_routed", keyspace.Name),
SchemaSQL: keyspace.SchemaSQL,
VSchema: keyspace.VSchema,
DurabilityPolicy: keyspace.DurabilityPolicy,
}

err = cluster.startKeyspace(*routedKeyspace, shardNames, replicaCount, rdonly, customizers...)
Expand Down Expand Up @@ -374,7 +376,7 @@ func (cluster *LocalProcessCluster) startKeyspace(keyspace Keyspace, shardNames
keyspace.SidecarDBName = sidecar.DefaultName
}
// Create the keyspace if it doesn't already exist.
_ = cluster.VtctlProcess.CreateKeyspace(keyspace.Name, keyspace.SidecarDBName)
_ = cluster.VtctlProcess.CreateKeyspace(keyspace.Name, keyspace.SidecarDBName, keyspace.DurabilityPolicy)
for _, shardName := range shardNames {
shard := &Shard{
Name: shardName,
Expand Down Expand Up @@ -538,7 +540,7 @@ func (cluster *LocalProcessCluster) StartKeyspaceLegacy(keyspace Keyspace, shard
keyspace.SidecarDBName = sidecar.DefaultName
}
// Create the keyspace if it doesn't already exist.
_ = cluster.VtctlProcess.CreateKeyspace(keyspace.Name, keyspace.SidecarDBName)
_ = cluster.VtctlProcess.CreateKeyspace(keyspace.Name, keyspace.SidecarDBName, keyspace.DurabilityPolicy)
var mysqlctlProcessList []*exec.Cmd
for _, shardName := range shardNames {
shard := &Shard{
Expand Down Expand Up @@ -681,7 +683,7 @@ func (cluster *LocalProcessCluster) SetupCluster(keyspace *Keyspace, shards []Sh

if !cluster.ReusingVTDATAROOT {
// Create Keyspace
err = cluster.VtctlProcess.CreateKeyspace(keyspace.Name, keyspace.SidecarDBName)
err = cluster.VtctlProcess.CreateKeyspace(keyspace.Name, keyspace.SidecarDBName, keyspace.DurabilityPolicy)
if err != nil {
log.Error(err)
return
Expand Down
16 changes: 8 additions & 8 deletions go/test/endtoend/cluster/vtctl_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,15 +60,15 @@ func (vtctl *VtctlProcess) AddCellInfo(Cell string) (err error) {
}

// CreateKeyspace executes vtctl command to create keyspace
func (vtctl *VtctlProcess) CreateKeyspace(keyspace, sidecarDBName string) (err error) {
var output string
// For upgrade/downgrade tests where an older version is also used.
if vtctl.VtctlMajorVersion < 17 {
log.Errorf("CreateKeyspace does not support the --sidecar-db-name flag in vtctl version %d; ignoring...", vtctl.VtctlMajorVersion)
output, err = vtctl.ExecuteCommandWithOutput("CreateKeyspace", keyspace)
} else {
output, err = vtctl.ExecuteCommandWithOutput("CreateKeyspace", keyspace, "--sidecar-db-name", sidecarDBName)
func (vtctl *VtctlProcess) CreateKeyspace(keyspace, sidecarDBName, durabilityPolicy string) error {
args := []string{
"CreateKeyspace", keyspace,
"--sidecar-db-name", sidecarDBName,
}
if durabilityPolicy != "" {
args = append(args, "--durability-policy", durabilityPolicy)
}
output, err := vtctl.ExecuteCommandWithOutput(args...)
if err != nil {
log.Errorf("CreateKeyspace returned err: %s, output: %s", err, output)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ func initializeCluster(t *testing.T) (int, error) {
for _, keyspaceStr := range []string{keyspace} {
KeyspacePtr := &cluster.Keyspace{Name: keyspaceStr}
keyspace := *KeyspacePtr
if err := clusterInstance.VtctlProcess.CreateKeyspace(keyspace.Name, sidecar.DefaultName); err != nil {
if err := clusterInstance.VtctlProcess.CreateKeyspace(keyspace.Name, sidecar.DefaultName, ""); err != nil {
return 1, err
}
shard := &cluster.Shard{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ func clusterSetUp(t *testing.T) (int, error) {
for _, keyspaceStr := range []string{keyspace} {
KeyspacePtr := &cluster.Keyspace{Name: keyspaceStr}
keyspace := *KeyspacePtr
if err := clusterInstance.VtctlProcess.CreateKeyspace(keyspace.Name, sidecar.DefaultName); err != nil {
if err := clusterInstance.VtctlProcess.CreateKeyspace(keyspace.Name, sidecar.DefaultName, ""); err != nil {
return 1, err
}
shard := &cluster.Shard{
Expand Down
2 changes: 1 addition & 1 deletion go/test/endtoend/mysqlctl/mysqlctl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func TestMain(m *testing.M) {
return 1
}

if err := clusterInstance.VtctlProcess.CreateKeyspace(keyspaceName, sidecar.DefaultName); err != nil {
if err := clusterInstance.VtctlProcess.CreateKeyspace(keyspaceName, sidecar.DefaultName, ""); err != nil {
return 1
}

Expand Down
2 changes: 1 addition & 1 deletion go/test/endtoend/mysqlctld/mysqlctld_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func TestMain(m *testing.M) {
return 1
}

if err := clusterInstance.VtctlProcess.CreateKeyspace(keyspaceName, sidecar.DefaultName); err != nil {
if err := clusterInstance.VtctlProcess.CreateKeyspace(keyspaceName, sidecar.DefaultName, ""); err != nil {
return 1
}

Expand Down
2 changes: 1 addition & 1 deletion go/test/endtoend/sharded/sharded_keyspace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func TestMain(m *testing.M) {
if err := clusterInstance.StartTopo(); err != nil {
return 1, err
}
if err := clusterInstance.VtctlProcess.CreateKeyspace(keyspaceName, sidecar.DefaultName); err != nil {
if err := clusterInstance.VtctlProcess.CreateKeyspace(keyspaceName, sidecar.DefaultName, ""); err != nil {
return 1, err
}

Expand Down
28 changes: 3 additions & 25 deletions go/test/endtoend/transaction/twopc/fuzzer/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (

"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/test/endtoend/cluster"
"vitess.io/vitess/go/test/endtoend/transaction/twopc/utils"
)

var (
Expand Down Expand Up @@ -110,29 +111,6 @@ func start(t *testing.T) (*mysql.Conn, func()) {
func cleanup(t *testing.T) {
cluster.PanicHandler(t)

ctx := context.Background()
conn, err := mysql.Connect(ctx, &vtParams)
require.NoError(t, err)
defer conn.Close()

clearOutTable(t, conn, "twopc_fuzzer_insert")
clearOutTable(t, conn, "twopc_fuzzer_update")
}

// clearOutTable deletes everything from a table. Sometimes the table might have more rows than allowed in a single delete query,
// so we have to do the deletions iteratively.
func clearOutTable(t *testing.T, conn *mysql.Conn, tableName string) {
for {
res, err := conn.ExecuteFetch(fmt.Sprintf("SELECT count(*) FROM %v", tableName), 1, false)
require.NoError(t, err)
require.Len(t, res.Rows, 1)
require.Len(t, res.Rows[0], 1)
rowCount, err := res.Rows[0][0].ToInt()
require.NoError(t, err)
if rowCount == 0 {
return
}
_, err = conn.ExecuteFetch(fmt.Sprintf("DELETE FROM %v LIMIT 10000", tableName), 10000, false)
require.NoError(t, err)
}
utils.ClearOutTable(t, vtParams, "twopc_fuzzer_insert")
utils.ClearOutTable(t, vtParams, "twopc_fuzzer_update")
}
22 changes: 9 additions & 13 deletions go/test/endtoend/transaction/twopc/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import (
"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/test/endtoend/cluster"
"vitess.io/vitess/go/test/endtoend/utils"
"vitess.io/vitess/go/test/endtoend/transaction/twopc/utils"
binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
querypb "vitess.io/vitess/go/vt/proto/query"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
Expand Down Expand Up @@ -85,12 +85,13 @@ func TestMain(m *testing.M) {

// Start keyspace
keyspace := &cluster.Keyspace{
Name: keyspaceName,
SchemaSQL: SchemaSQL,
VSchema: VSchema,
SidecarDBName: sidecarDBName,
Name: keyspaceName,
SchemaSQL: SchemaSQL,
VSchema: VSchema,
SidecarDBName: sidecarDBName,
DurabilityPolicy: "semi_sync",
}
if err := clusterInstance.StartKeyspace(*keyspace, []string{"-40", "40-80", "80-"}, 0, false); err != nil {
if err := clusterInstance.StartKeyspace(*keyspace, []string{"-40", "40-80", "80-"}, 2, false); err != nil {
return 1
}

Expand Down Expand Up @@ -119,13 +120,8 @@ func start(t *testing.T) (*mysql.Conn, func()) {

func cleanup(t *testing.T) {
cluster.PanicHandler(t)

ctx := context.Background()
conn, err := mysql.Connect(ctx, &vtParams)
require.NoError(t, err)
defer conn.Close()

_, _ = utils.ExecAllowError(t, conn, "delete from twopc_user")
utils.ClearOutTable(t, vtParams, "twopc_user")
utils.ClearOutTable(t, vtParams, "twopc_t1")
}

type extractInterestingValues func(dtidMap map[string]string, vals []sqltypes.Value) []sqltypes.Value
Expand Down
6 changes: 6 additions & 0 deletions go/test/endtoend/transaction/twopc/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,10 @@ create table twopc_music (
user_id bigint,
title varchar(64),
primary key (id)
) Engine=InnoDB;

create table twopc_t1 (
id bigint,
col bigint,
primary key (id, col)
) Engine=InnoDB;
126 changes: 126 additions & 0 deletions go/test/endtoend/transaction/twopc/twopc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"context"
_ "embed"
"fmt"
"os"
"path"
"reflect"
"sort"
"strings"
Expand All @@ -35,11 +37,17 @@ import (
"vitess.io/vitess/go/test/endtoend/cluster"
"vitess.io/vitess/go/test/endtoend/utils"
"vitess.io/vitess/go/vt/callerid"
"vitess.io/vitess/go/vt/log"
binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
querypb "vitess.io/vitess/go/vt/proto/query"
"vitess.io/vitess/go/vt/vtgate/vtgateconn"
)

const (
DebugDelayCommitShard = "VT_DELAY_COMMIT_SHARD"
DebugDelayCommitTime = "VT_DELAY_COMMIT_TIME"
)

// TestDTCommit tests distributed transaction commit for insert, update and delete operations
// It verifies the binlog events for the same with transaction state changes and redo statements.
func TestDTCommit(t *testing.T) {
Expand Down Expand Up @@ -955,3 +963,121 @@ func testWarningAndTransactionStatus(t *testing.T, conn *vtgateconn.VTGateSessio
assert.Equal(t, txParticipants, tx.participants)
}
}

// TestDisruptions tests that atomic transactions persevere through various disruptions.
func TestDisruptions(t *testing.T) {
testcases := []struct {
disruptionName string
commitDelayTime string
disruption func() error
}{
{
disruptionName: "No Disruption",
commitDelayTime: "1",
disruption: func() error {
return nil
},
},
{
disruptionName: "PlannedReparentShard",
commitDelayTime: "5",
disruption: prsShard3,
},
}
for _, tt := range testcases {
t.Run(fmt.Sprintf("%s-%ss timeout", tt.disruptionName, tt.commitDelayTime), func(t *testing.T) {
// Reparent all the shards to first tablet being the primary.
reparentToFistTablet(t)
// cleanup all the old data.
conn, closer := start(t)
defer closer()
// Start an atomic transaction.
utils.Exec(t, conn, "begin")
// Insert rows such that they go to all the three shards. Given that we have sharded the table `twopc_t1` on reverse_bits
// it is very easy to figure out what value will end up in which shard.
utils.Exec(t, conn, "insert into twopc_t1(id, col) values(4, 4)")
utils.Exec(t, conn, "insert into twopc_t1(id, col) values(6, 4)")
utils.Exec(t, conn, "insert into twopc_t1(id, col) values(9, 4)")
// We want to delay the commit on one of the shards to simulate slow commits on a shard.
writeTestCommunicationFile(t, DebugDelayCommitShard, "80-")
defer deleteFile(DebugDelayCommitShard)
writeTestCommunicationFile(t, DebugDelayCommitTime, tt.commitDelayTime)
defer deleteFile(DebugDelayCommitTime)
// We will execute a commit in a go routine, because we know it will take some time to complete.
// While the commit is ongoing, we would like to run the disruption.
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
_, err := utils.ExecAllowError(t, conn, "commit")
if err != nil {
log.Errorf("Error in commit - %v", err)
}
}()
// Allow enough time for the commit to have started.
time.Sleep(1 * time.Second)
// Run the disruption.
err := tt.disruption()
require.NoError(t, err)
// Wait for the commit to have returned. We don't actually check for an error in the commit because the user might receive an error.
// But since we are waiting in CommitPrepared, the decision to commit the transaction should have already been taken.
wg.Wait()
// Check the data in the table.
waitForResults(t, "select id, col from twopc_t1 where col = 4 order by id", `[[INT64(4) INT64(4)] [INT64(6) INT64(4)] [INT64(9) INT64(4)]]`, 10*time.Second)
})
}
}

// reparentToFistTablet reparents all the shards to first tablet being the primary.
func reparentToFistTablet(t *testing.T) {
ks := clusterInstance.Keyspaces[0]
for _, shard := range ks.Shards {
primary := shard.Vttablets[0]
err := clusterInstance.VtctldClientProcess.PlannedReparentShard(keyspaceName, shard.Name, primary.Alias)
require.NoError(t, err)
}
}

// writeTestCommunicationFile writes the content to the file with the given name.
// We use these files to coordinate with the vttablets running in the debug mode.
func writeTestCommunicationFile(t *testing.T, fileName string, content string) {
err := os.WriteFile(path.Join(os.Getenv("VTDATAROOT"), fileName), []byte(content), 0644)
require.NoError(t, err)
}

// deleteFile deletes the file specified.
func deleteFile(fileName string) {
_ = os.Remove(path.Join(os.Getenv("VTDATAROOT"), fileName))
}

// waitForResults waits for the results of the query to be as expected.
func waitForResults(t *testing.T, query string, resultExpected string, waitTime time.Duration) {
timeout := time.After(waitTime)
for {
select {
case <-timeout:
t.Fatalf("didn't reach expected results for %s", query)
default:
ctx := context.Background()
conn, err := mysql.Connect(ctx, &vtParams)
require.NoError(t, err)
res := utils.Exec(t, conn, query)
conn.Close()
if fmt.Sprintf("%v", res.Rows) == resultExpected {
return
}
time.Sleep(100 * time.Millisecond)
}
}
}

/*
Cluster Level Disruptions for the fuzzer
*/

// prsShard3 runs a PRS in shard 3 of the keyspace. It promotes the second tablet to be the new primary.
func prsShard3() error {
shard := clusterInstance.Keyspaces[0].Shards[2]
newPrimary := shard.Vttablets[1]
return clusterInstance.VtctldClientProcess.PlannedReparentShard(keyspaceName, shard.Name, newPrimary.Alias)
}
Loading

0 comments on commit 3c2e8f9

Please sign in to comment.