Skip to content

Commit

Permalink
save error message on retryable error to redo log state table
Browse files Browse the repository at this point in the history
Signed-off-by: Harshit Gangal <[email protected]>
  • Loading branch information
harshit-gangal committed Sep 20, 2024
1 parent 24ebb05 commit b657d84
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 31 deletions.
12 changes: 2 additions & 10 deletions go/vt/vttablet/tabletserver/dt_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,19 +156,11 @@ func (dte *DTExecutor) CommitPrepared(dtid string) (err error) {
// even if the original context expires.
ctx := trace.CopySpan(context.Background(), dte.ctx)
defer func() {
rollback := true
if err != nil {
log.Warningf("failed to commit the prepared transaction '%s' with error: %v", dtid, err)
if !dte.te.checkErrorAndMarkFailed(ctx, dtid, err) {
err = dte.te.preparedPool.Put(conn, dtid)
// success in putting the connection back in the pool
// we can avoid rollback and can try commit again.
rollback = err != nil
}
}
if rollback {
dte.te.txPool.RollbackAndRelease(ctx, conn)
dte.te.checkErrorAndMarkFailed(ctx, dtid, err, "TwopcCommit")
}
dte.te.txPool.RollbackAndRelease(ctx, conn)
}()
if err = dte.te.twoPC.DeleteRedo(ctx, conn, dtid); err != nil {
return err
Expand Down
42 changes: 23 additions & 19 deletions go/vt/vttablet/tabletserver/tx_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,15 @@ func (te *TxEngine) prepareFromRedo() error {
allErrs []error
)

checkErr := func(dtid string, err error) {
if err != nil {
allErrs = append(allErrs, vterrors.Wrapf(err, "dtid - %v", dtid))
if te.checkErrorAndMarkFailed(ctx, dtid, err, "TwopcPrepareRedo") {
failedCounter++
}
}
}

outer:
for _, preparedTx := range prepared {
var conn *StatefulConnection
Expand All @@ -436,12 +445,7 @@ outer:
}

// check last error to record failure.
if lastErr != nil {
allErrs = append(allErrs, vterrors.Wrapf(lastErr, "dtid - %v", lastDtid))
if te.checkErrorAndMarkFailed(ctx, lastDtid, lastErr) {
failedCounter++
}
}
checkErr(lastDtid, lastErr)

lastDtid = preparedTx.Dtid

Expand All @@ -466,12 +470,7 @@ outer:
}

// check last error to record failure.
if lastErr != nil {
allErrs = append(allErrs, vterrors.Wrapf(lastErr, "dtid - %v", lastDtid))
if te.checkErrorAndMarkFailed(ctx, lastDtid, lastErr) {
failedCounter++
}
}
checkErr(lastDtid, lastErr)

for _, preparedTx := range failed {
txID, _ := dtids.TransactionID(preparedTx.Dtid)
Expand All @@ -487,24 +486,29 @@ outer:

// checkErrorAndMarkFailed check that the error is retryable or non-retryable error.
// If it is a non-retryable error than it marks the dtid as failed in the prepared pool,
// increments the InternalErrors counter, and also changes the state of the transaction in the redo log as failed.
func (te *TxEngine) checkErrorAndMarkFailed(ctx context.Context, dtid string, receivedErr error) (fail bool) {
// increments the InternalErrors counter, and also changes the state of the transaction in the redo log as failed.
func (te *TxEngine) checkErrorAndMarkFailed(ctx context.Context, dtid string, receivedErr error, metricName string) (fail bool) {
state := RedoStateFailed
if isRetryableError(receivedErr) {
log.Infof("retryable error for dtid: %s", dtid)
return
state = RedoStatePrepared
} else {
fail = true
te.env.Stats().InternalErrors.Add(metricName, 1)
te.preparedPool.SetFailed(dtid)
}

fail = true
te.env.Stats().InternalErrors.Add("TwopcCommit", 1)
te.preparedPool.SetFailed(dtid)
// Update the state of the transaction in the redo log.
// Retryable Error: Update the message with error message.
// Non-retryable Error: Along with message, update the state as RedoStateFailed.
conn, _, _, err := te.txPool.Begin(ctx, &querypb.ExecuteOptions{}, false, 0, nil, nil)
if err != nil {
log.Errorf("markFailed: Begin failed for dtid %s: %v", dtid, err)
return
}
defer te.txPool.RollbackAndRelease(ctx, conn)

if err = te.twoPC.UpdateRedo(ctx, conn, dtid, RedoStateFailed, receivedErr.Error()); err != nil {
if err = te.twoPC.UpdateRedo(ctx, conn, dtid, state, receivedErr.Error()); err != nil {
log.Errorf("markFailed: UpdateRedo failed for dtid %s: %v", dtid, err)
return
}
Expand Down
9 changes: 7 additions & 2 deletions go/vt/vttablet/tabletserver/tx_engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -610,7 +610,6 @@ func TestTxEngineFailReserve(t *testing.T) {
func TestCheckReceivedError(t *testing.T) {
db := setUpQueryExecutorTest(t)
defer db.Close()
// db.AddQueryPattern(".*", &sqltypes.Result{})
cfg := tabletenv.NewDefaultConfig()
cfg.DB = newDBConfigs(db)
env := tabletenv.NewEnv(vtenv.NewTestEnv(), cfg, "TabletServerTest")
Expand All @@ -626,6 +625,7 @@ func TestCheckReceivedError(t *testing.T) {
}{{
receivedErr: vterrors.New(vtrpcpb.Code_DEADLINE_EXCEEDED, "deadline exceeded"),
nonRetryable: false,
expQuery: `update _vt.redo_state set state = 1, message = 'deadline exceeded' where dtid = 'aa'`,
}, {
receivedErr: vterrors.New(vtrpcpb.Code_INVALID_ARGUMENT, "invalid argument"),
nonRetryable: true,
Expand All @@ -637,34 +637,39 @@ func TestCheckReceivedError(t *testing.T) {
}, {
receivedErr: context.DeadlineExceeded,
nonRetryable: false,
expQuery: `update _vt.redo_state set state = 1, message = 'context deadline exceeded' where dtid = 'aa'`,
}, {
receivedErr: context.Canceled,
nonRetryable: false,
expQuery: `update _vt.redo_state set state = 1, message = 'context canceled' where dtid = 'aa'`,
}, {
receivedErr: sqlerror.NewSQLError(sqlerror.CRServerLost, sqlerror.SSUnknownSQLState, "Lost connection to MySQL server during query"),
nonRetryable: false,
expQuery: `update _vt.redo_state set state = 1, message = 'Lost connection to MySQL server during query (errno 2013) (sqlstate HY000)' where dtid = 'aa'`,
}, {
receivedErr: sqlerror.NewSQLError(sqlerror.CRMalformedPacket, sqlerror.SSUnknownSQLState, "Malformed packet"),
nonRetryable: true,
expQuery: `update _vt.redo_state set state = 0, message = 'Malformed packet (errno 2027) (sqlstate HY000)' where dtid = 'aa'`,
}, {
receivedErr: sqlerror.NewSQLError(sqlerror.CRServerGone, sqlerror.SSUnknownSQLState, "Server has gone away"),
nonRetryable: false,
expQuery: `update _vt.redo_state set state = 1, message = 'Server has gone away (errno 2006) (sqlstate HY000)' where dtid = 'aa'`,
}, {
receivedErr: vterrors.New(vtrpcpb.Code_ABORTED, "Row count exceeded"),
nonRetryable: true,
expQuery: `update _vt.redo_state set state = 0, message = 'Row count exceeded' where dtid = 'aa'`,
}, {
receivedErr: errors.New("(errno 2013) (sqlstate HY000) lost connection"),
nonRetryable: false,
expQuery: `update _vt.redo_state set state = 1, message = '(errno 2013) (sqlstate HY000) lost connection' where dtid = 'aa'`,
}}

for _, tc := range tcases {
t.Run(tc.receivedErr.Error(), func(t *testing.T) {
if tc.expQuery != "" {
db.AddQuery(tc.expQuery, &sqltypes.Result{})
}
nonRetryable := te.checkErrorAndMarkFailed(context.Background(), "aa", tc.receivedErr)
nonRetryable := te.checkErrorAndMarkFailed(context.Background(), "aa", tc.receivedErr, "")
require.Equal(t, tc.nonRetryable, nonRetryable)
if tc.nonRetryable {
require.Equal(t, errPrepFailed, te.preparedPool.reserved["aa"])
Expand Down

0 comments on commit b657d84

Please sign in to comment.