From cc8add39df4c4474b5eaa69ac405ce5551556628 Mon Sep 17 00:00:00 2001 From: Harshit Gangal Date: Wed, 30 Oct 2024 19:17:38 +0530 Subject: [PATCH] handle startcommit state in vtgate to rollback prepared transaction when the failure is certain Signed-off-by: Harshit Gangal --- go/vt/vtgate/tx_conn.go | 44 ++++++++++++++++++----- go/vt/vtgate/tx_conn_test.go | 38 +++++++++++++------- go/vt/vttablet/sandboxconn/sandboxconn.go | 40 +++++++++++++++++---- 3 files changed, 94 insertions(+), 28 deletions(-) diff --git a/go/vt/vtgate/tx_conn.go b/go/vt/vtgate/tx_conn.go index 82ec72a694a..315484ea499 100644 --- a/go/vt/vtgate/tx_conn.go +++ b/go/vt/vtgate/tx_conn.go @@ -70,6 +70,14 @@ const ( Commit2pcConclude ) +var phaseMessage = map[commitPhase]string{ + Commit2pcCreateTransaction: "Create Transaction", + Commit2pcPrepare: "Prepare", + Commit2pcStartCommit: "Start Commit", + Commit2pcPrepareCommit: "Prepare Commit", + Commit2pcConclude: "Conclude", +} + // Begin begins a new transaction. If one is already in progress, it commits it // and starts a new one. func (txc *TxConn) Begin(ctx context.Context, session *SafeSession, txAccessModes []sqlparser.TxAccessMode) error { @@ -221,11 +229,12 @@ func (txc *TxConn) commit2PC(ctx context.Context, session *SafeSession) (err err } var txPhase commitPhase + var startCommitState querypb.StartCommitState defer func() { if err == nil { return } - txc.errActionAndLogWarn(ctx, session, txPhase, dtid, mmShard, rmShards) + txc.errActionAndLogWarn(ctx, session, txPhase, startCommitState, dtid, mmShard, rmShards) }() txPhase = Commit2pcCreateTransaction @@ -259,7 +268,7 @@ func (txc *TxConn) commit2PC(ctx context.Context, session *SafeSession) (err err } txPhase = Commit2pcStartCommit - _, err = txc.tabletGateway.StartCommit(ctx, mmShard.Target, mmShard.TransactionId, dtid) + startCommitState, err = txc.tabletGateway.StartCommit(ctx, mmShard.Target, mmShard.TransactionId, dtid) if err != nil { return err } @@ -298,21 +307,38 @@ func (txc *TxConn) checkValidCondition(session *SafeSession) error { return nil } -func (txc *TxConn) errActionAndLogWarn(ctx context.Context, session *SafeSession, txPhase commitPhase, dtid string, mmShard *vtgatepb.Session_ShardSession, rmShards []*vtgatepb.Session_ShardSession) { +func (txc *TxConn) errActionAndLogWarn( + ctx context.Context, + session *SafeSession, + txPhase commitPhase, + startCommitState querypb.StartCommitState, + dtid string, + mmShard *vtgatepb.Session_ShardSession, + rmShards []*vtgatepb.Session_ShardSession, +) { + var rollbackErr error switch txPhase { case Commit2pcCreateTransaction: // Normal rollback is safe because nothing was prepared yet. - if rollbackErr := txc.Rollback(ctx, session); rollbackErr != nil { - log.Warningf("Rollback failed after Create Transaction failure: %v", rollbackErr) - } + rollbackErr = txc.Rollback(ctx, session) case Commit2pcPrepare: // Rollback the prepared and unprepared transactions. - if resumeErr := txc.rollbackTx(ctx, dtid, mmShard, rmShards, session.logging); resumeErr != nil { - log.Warningf("Rollback failed after Prepare failure: %v", resumeErr) + rollbackErr = txc.rollbackTx(ctx, dtid, mmShard, rmShards, session.logging) + case Commit2pcStartCommit: + // Failed to store the commit decision on MM. + // If the failure state is certain, then the only option is to rollback the prepared transactions on the RMs. + if startCommitState == querypb.StartCommitState_Fail { + rollbackErr = txc.rollbackTx(ctx, dtid, mmShard, rmShards, session.logging) } - case Commit2pcStartCommit, Commit2pcPrepareCommit: + fallthrough + case Commit2pcPrepareCommit: + commitUnresolved.Add(1) + } + if rollbackErr != nil { + log.Warningf("Rollback failed after %s failure: %v", phaseMessage[txPhase], rollbackErr) commitUnresolved.Add(1) } + session.RecordWarning(&querypb.QueryWarning{ Code: uint32(sqlerror.ERInAtomicRecovery), Message: createWarningMessage(dtid, txPhase)}) diff --git a/go/vt/vtgate/tx_conn_test.go b/go/vt/vtgate/tx_conn_test.go index dd1415bce87..9d49626f6f1 100644 --- a/go/vt/vtgate/tx_conn_test.go +++ b/go/vt/vtgate/tx_conn_test.go @@ -994,9 +994,7 @@ func TestTxConnCommit2PCCreateTransactionFail(t *testing.T) { sbc0.MustFailCreateTransaction = 1 session.TransactionMode = vtgatepb.TransactionMode_TWOPC err := sc.txConn.Commit(ctx, session) - want := "error: err" - require.Error(t, err) - assert.Contains(t, err.Error(), want, "Commit") + require.ErrorContains(t, err, "target: TestTxConnCommit2PCCreateTransactionFail.0.primary: error: err") assert.EqualValues(t, 1, sbc0.CreateTransactionCount.Load(), "sbc0.CreateTransactionCount") assert.EqualValues(t, 1, sbc0.RollbackCount.Load(), "sbc0.RollbackCount") assert.EqualValues(t, 1, sbc1.RollbackCount.Load(), "sbc1.RollbackCount") @@ -1018,9 +1016,7 @@ func TestTxConnCommit2PCPrepareFail(t *testing.T) { sbc1.MustFailPrepare = 1 session.TransactionMode = vtgatepb.TransactionMode_TWOPC err := sc.txConn.Commit(ctx, session) - want := "error: err" - require.Error(t, err) - assert.Contains(t, err.Error(), want, "Commit") + require.ErrorContains(t, err, "target: TestTxConnCommit2PCPrepareFail.1.primary: error: err") assert.EqualValues(t, 1, sbc0.CreateTransactionCount.Load(), "sbc0.CreateTransactionCount") assert.EqualValues(t, 1, sbc1.PrepareCount.Load(), "sbc1.PrepareCount") // Prepared failed on RM, so no commit on MM or RMs. @@ -1046,13 +1042,33 @@ func TestTxConnCommit2PCStartCommitFail(t *testing.T) { sbc0.MustFailStartCommit = 1 session.TransactionMode = vtgatepb.TransactionMode_TWOPC err := sc.txConn.Commit(ctx, session) - want := "error: err" - require.Error(t, err) - assert.Contains(t, err.Error(), want, "Commit") + require.ErrorContains(t, err, "target: TestTxConnCommit2PCStartCommitFail.0.primary: error: err") assert.EqualValues(t, 1, sbc0.CreateTransactionCount.Load(), "sbc0.CreateTransactionCount") assert.EqualValues(t, 1, sbc1.PrepareCount.Load(), "sbc1.PrepareCount") assert.EqualValues(t, 1, sbc0.StartCommitCount.Load(), "sbc0.StartCommitCount") assert.EqualValues(t, 0, sbc1.CommitPreparedCount.Load(), "sbc1.CommitPreparedCount") + assert.EqualValues(t, 1, sbc0.SetRollbackCount.Load(), "MM") + assert.EqualValues(t, 1, sbc1.RollbackPreparedCount.Load(), "RM") + assert.EqualValues(t, 1, sbc0.ConcludeTransactionCount.Load(), "sbc0.ConcludeTransactionCount") + + sbc0.ResetCounter() + sbc1.ResetCounter() + + session = NewSafeSession(&vtgatepb.Session{InTransaction: true}) + sc.ExecuteMultiShard(ctx, nil, rss0, queries, session, false, false, nullResultsObserver{}) + sc.ExecuteMultiShard(ctx, nil, rss01, twoQueries, session, false, false, nullResultsObserver{}) + + // Here the StartCommit failure is in uncertain state so rollback is not called and neither conclude. + sbc0.MustFailStartCommitUncertain = 1 + session.TransactionMode = vtgatepb.TransactionMode_TWOPC + err = sc.txConn.Commit(ctx, session) + require.ErrorContains(t, err, "target: TestTxConnCommit2PCStartCommitFail.0.primary: uncertain error") + assert.EqualValues(t, 1, sbc0.CreateTransactionCount.Load(), "sbc0.CreateTransactionCount") + assert.EqualValues(t, 1, sbc1.PrepareCount.Load(), "sbc1.PrepareCount") + assert.EqualValues(t, 1, sbc0.StartCommitCount.Load(), "sbc0.StartCommitCount") + assert.EqualValues(t, 0, sbc1.CommitPreparedCount.Load(), "sbc1.CommitPreparedCount") + assert.EqualValues(t, 0, sbc0.SetRollbackCount.Load(), "MM") + assert.EqualValues(t, 0, sbc1.RollbackPreparedCount.Load(), "RM") assert.EqualValues(t, 0, sbc0.ConcludeTransactionCount.Load(), "sbc0.ConcludeTransactionCount") } @@ -1068,9 +1084,7 @@ func TestTxConnCommit2PCCommitPreparedFail(t *testing.T) { sbc1.MustFailCommitPrepared = 1 session.TransactionMode = vtgatepb.TransactionMode_TWOPC err := sc.txConn.Commit(ctx, session) - want := "error: err" - require.Error(t, err) - assert.Contains(t, err.Error(), want, "Commit") + require.ErrorContains(t, err, "target: TestTxConnCommit2PCCommitPreparedFail.1.primary: error: err") assert.EqualValues(t, 1, sbc0.CreateTransactionCount.Load(), "sbc0.CreateTransactionCount") assert.EqualValues(t, 1, sbc1.PrepareCount.Load(), "sbc1.PrepareCount") assert.EqualValues(t, 1, sbc0.StartCommitCount.Load(), "sbc0.StartCommitCount") diff --git a/go/vt/vttablet/sandboxconn/sandboxconn.go b/go/vt/vttablet/sandboxconn/sandboxconn.go index 148c630543e..a34baef1238 100644 --- a/go/vt/vttablet/sandboxconn/sandboxconn.go +++ b/go/vt/vttablet/sandboxconn/sandboxconn.go @@ -49,13 +49,14 @@ type SandboxConn struct { // These errors are triggered only for specific functions. // For now these are just for the 2PC functions. - MustFailPrepare int - MustFailCommitPrepared int - MustFailRollbackPrepared int - MustFailCreateTransaction int - MustFailStartCommit int - MustFailSetRollback int - MustFailConcludeTransaction int + MustFailPrepare int + MustFailCommitPrepared int + MustFailRollbackPrepared int + MustFailCreateTransaction int + MustFailStartCommit int + MustFailStartCommitUncertain int + MustFailSetRollback int + MustFailConcludeTransaction int // MustFailExecute is keyed by the statement type and stores the number // of times to fail when it sees that statement type. // Once, exhausted it will start returning non-error response. @@ -157,6 +158,27 @@ func NewSandboxConn(t *topodatapb.Tablet) *SandboxConn { } } +// ResetCounter resets the counters in the sandboxconn. +func (sbc *SandboxConn) ResetCounter() { + sbc.ExecCount.Store(0) + sbc.BeginCount.Store(0) + sbc.CommitCount.Store(0) + sbc.RollbackCount.Store(0) + sbc.AsTransactionCount.Store(0) + sbc.PrepareCount.Store(0) + sbc.CommitPreparedCount.Store(0) + sbc.RollbackPreparedCount.Store(0) + sbc.CreateTransactionCount.Store(0) + sbc.StartCommitCount.Store(0) + sbc.SetRollbackCount.Store(0) + sbc.ConcludeTransactionCount.Store(0) + sbc.ReadTransactionCount.Store(0) + sbc.UnresolvedTransactionsCount.Store(0) + sbc.ReserveCount.Store(0) + sbc.ReleaseCount.Store(0) + sbc.GetSchemaCount.Store(0) +} + // RequireQueriesLocking sets the sandboxconn to require locking the access of Queries field. func (sbc *SandboxConn) RequireQueriesLocking() { sbc.queriesRequireLocking = true @@ -411,6 +433,10 @@ func (sbc *SandboxConn) StartCommit(context.Context, *querypb.Target, int64, str sbc.MustFailStartCommit-- return querypb.StartCommitState_Fail, vterrors.New(vtrpcpb.Code_FAILED_PRECONDITION, "error: err") } + if sbc.MustFailStartCommitUncertain > 0 { + sbc.MustFailStartCommitUncertain-- + return querypb.StartCommitState_Unknown, vterrors.New(vtrpcpb.Code_FAILED_PRECONDITION, "uncertain error") + } err = sbc.getError() if err != nil { return querypb.StartCommitState_Unknown, err