Skip to content

Commit

Permalink
Fail VTBackup early when replication or MySQL is failing (#17356)
Browse files Browse the repository at this point in the history
Signed-off-by: Florent Poinsard <[email protected]>
  • Loading branch information
frouioui authored Jan 21, 2025
1 parent d6cd88c commit bf2423f
Show file tree
Hide file tree
Showing 2 changed files with 102 additions and 12 deletions.
27 changes: 26 additions & 1 deletion go/cmd/vtbackup/cli/vtbackup.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package cli
import (
"context"
"crypto/rand"
"errors"
"fmt"
"math"
"math/big"
Expand Down Expand Up @@ -65,6 +66,8 @@ const (
phaseNameTakeNewBackup = "TakeNewBackup"
phaseStatusCatchupReplicationStalled = "Stalled"
phaseStatusCatchupReplicationStopped = "Stopped"

timeoutWaitingForReplicationStatus = 60 * time.Second
)

var (
Expand Down Expand Up @@ -335,6 +338,18 @@ func takeBackup(ctx, backgroundCtx context.Context, topoServer *topo.Server, bac
if err != nil {
return fmt.Errorf("failed to initialize mysql config: %v", err)
}
ctx, cancelCtx := context.WithCancel(ctx)
backgroundCtx, cancelBackgroundCtx := context.WithCancel(backgroundCtx)
defer func() {
cancelCtx()
cancelBackgroundCtx()
}()
mysqld.OnTerm(func() {
log.Warning("Cancelling vtbackup as MySQL has terminated")
cancelCtx()
cancelBackgroundCtx()
})

initCtx, initCancel := context.WithTimeout(ctx, mysqlTimeout)
defer initCancel()
initMysqldAt := time.Now()
Expand Down Expand Up @@ -520,7 +535,13 @@ func takeBackup(ctx, backgroundCtx context.Context, topoServer *topo.Server, bac

waitStartTime = time.Now()
)

lastErr := vterrors.NewLastError("replication catch up", timeoutWaitingForReplicationStatus)
for {
if !lastErr.ShouldRetry() {
return fmt.Errorf("timeout waiting for replication status after %.0f seconds", timeoutWaitingForReplicationStatus.Seconds())
}

select {
case <-ctx.Done():
return fmt.Errorf("error in replication catch up: %v", ctx.Err())
Expand All @@ -530,6 +551,7 @@ func takeBackup(ctx, backgroundCtx context.Context, topoServer *topo.Server, bac
lastStatus = status
status, statusErr = mysqld.ReplicationStatus(ctx)
if statusErr != nil {
lastErr.Record(statusErr)
log.Warningf("Error getting replication status: %v", statusErr)
continue
}
Expand All @@ -548,7 +570,10 @@ func takeBackup(ctx, backgroundCtx context.Context, topoServer *topo.Server, bac
}
}
if !status.Healthy() {
log.Warning("Replication has stopped before backup could be taken. Trying to restart replication.")
errStr := "Replication has stopped before backup could be taken. Trying to restart replication."
log.Warning(errStr)
lastErr.Record(errors.New(strings.ToLower(errStr)))

phaseStatus.Set([]string{phaseNameCatchupReplication, phaseStatusCatchupReplicationStopped}, 1)
if err := startReplication(ctx, mysqld, topoServer); err != nil {
log.Warningf("Failed to restart replication: %v", err)
Expand Down
87 changes: 76 additions & 11 deletions go/test/endtoend/backup/vtbackup/backup_only_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,55 @@ var (
) Engine=InnoDB;`
)

func TestFailingReplication(t *testing.T) {
prepareCluster(t)

// Run the entire backup test
firstBackupTest(t, false)

// Insert one more row, the primary will be ahead of the last backup
_, err := primary.VttabletProcess.QueryTablet("insert into vt_insert_test (msg) values ('test_failure')", keyspaceName, true)
require.NoError(t, err)

// Disable replication from the primary by removing the grants to 'vt_repl'.
_, err = primary.VttabletProcess.QueryTablet("REVOKE REPLICATION SLAVE ON *.* FROM 'vt_repl'@'%';", keyspaceName, true)
require.NoError(t, err)
_, err = primary.VttabletProcess.QueryTablet("FLUSH PRIVILEGES;", keyspaceName, true)
require.NoError(t, err)

// Take a backup with vtbackup: the process should fail entirely as it cannot replicate from the primary.
_, err = startVtBackup(t, false, false, false)
require.Error(t, err)

// keep in mind how many backups we have right now
backups, err := listBackups(shardKsName)
require.NoError(t, err)

// In 30 seconds, grant the replication permission again to 'vt_repl'.
// This will mean that vtbackup should fail to replicate for ~30 seconds, until we grant the permission again.
go func() {
<-time.After(30 * time.Second)
_, err = primary.VttabletProcess.QueryTablet("GRANT REPLICATION SLAVE ON *.* TO 'vt_repl'@'%';", keyspaceName, true)
require.NoError(t, err)
_, err = primary.VttabletProcess.QueryTablet("FLUSH PRIVILEGES;", keyspaceName, true)
require.NoError(t, err)
}()

startTime := time.Now()
// this will initially be stuck trying to replicate from the primary, and once we re-grant the permission in
// the goroutine above, the process will work and complete successfully.
_ = vtBackup(t, false, false, false)

require.GreaterOrEqual(t, time.Since(startTime).Seconds(), float64(30))

verifyBackupCount(t, shardKsName, len(backups)+1)

removeBackups(t)
verifyBackupCount(t, shardKsName, 0)

tearDown(t, true)
}

func TestTabletInitialBackup(t *testing.T) {
// Test Initial Backup Flow
// TestTabletInitialBackup will:
Expand All @@ -59,6 +108,15 @@ func TestTabletInitialBackup(t *testing.T) {
// - Bring up a second replica, and restore from the second backup
// - list the backups, remove them

prepareCluster(t)

// Run the entire backup test
firstBackupTest(t, true)

tearDown(t, true)
}

func prepareCluster(t *testing.T) {
waitForReplicationToCatchup([]cluster.Vttablet{*replica1, *replica2})

dataPointReader := vtBackup(t, true, false, false)
Expand All @@ -84,11 +142,6 @@ func TestTabletInitialBackup(t *testing.T) {
"TabletExternallyReparented", primary.Alias)
require.NoError(t, err)
restore(t, replica1, "replica", "SERVING")

// Run the entire backup test
firstBackupTest(t, "replica")

tearDown(t, true)
}

func TestTabletBackupOnly(t *testing.T) {
Expand All @@ -107,12 +160,12 @@ func TestTabletBackupOnly(t *testing.T) {
replica1.VttabletProcess.ServingStatus = "NOT_SERVING"

initTablets(t, true, true)
firstBackupTest(t, "replica")
firstBackupTest(t, true)

tearDown(t, false)
}

func firstBackupTest(t *testing.T, tabletType string) {
func firstBackupTest(t *testing.T, removeBackup bool) {
// Test First Backup flow.
//
// firstBackupTest will:
Expand Down Expand Up @@ -168,11 +221,13 @@ func firstBackupTest(t *testing.T, tabletType string) {
// check the new replica has the data
cluster.VerifyRowsInTablet(t, replica2, keyspaceName, 2)

removeBackups(t)
verifyBackupCount(t, shardKsName, 0)
if removeBackup {
removeBackups(t)
verifyBackupCount(t, shardKsName, 0)
}
}

func vtBackup(t *testing.T, initialBackup bool, restartBeforeBackup, disableRedoLog bool) *opentsdb.DataPointReader {
func startVtBackup(t *testing.T, initialBackup bool, restartBeforeBackup, disableRedoLog bool) (*os.File, error) {
mysqlSocket, err := os.CreateTemp("", "vtbackup_test_mysql.sock")
require.NoError(t, err)
defer os.Remove(mysqlSocket.Name())
Expand Down Expand Up @@ -207,9 +262,19 @@ func vtBackup(t *testing.T, initialBackup bool, restartBeforeBackup, disableRedo

log.Infof("starting backup tablet %s", time.Now())
err = localCluster.StartVtbackup(newInitDBFile, initialBackup, keyspaceName, shardName, cell, extraArgs...)
require.NoError(t, err)
if err != nil {
return nil, err
}

f, err := os.OpenFile(statsPath, os.O_RDONLY, 0)
if err != nil {
return nil, err
}
return f, nil
}

func vtBackup(t *testing.T, initialBackup bool, restartBeforeBackup, disableRedoLog bool) *opentsdb.DataPointReader {
f, err := startVtBackup(t, initialBackup, restartBeforeBackup, disableRedoLog)
require.NoError(t, err)
return opentsdb.NewDataPointReader(f)
}
Expand Down

0 comments on commit bf2423f

Please sign in to comment.