Skip to content

Commit

Permalink
handle startcommit state in vtgate to rollback prepared transaction w…
Browse files Browse the repository at this point in the history
…hen the failure is certain

Signed-off-by: Harshit Gangal <[email protected]>
  • Loading branch information
harshit-gangal committed Oct 30, 2024
1 parent afc5ea3 commit cc8add3
Show file tree
Hide file tree
Showing 3 changed files with 94 additions and 28 deletions.
44 changes: 35 additions & 9 deletions go/vt/vtgate/tx_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,14 @@ const (
Commit2pcConclude
)

var phaseMessage = map[commitPhase]string{
Commit2pcCreateTransaction: "Create Transaction",
Commit2pcPrepare: "Prepare",
Commit2pcStartCommit: "Start Commit",
Commit2pcPrepareCommit: "Prepare Commit",
Commit2pcConclude: "Conclude",
}

// Begin begins a new transaction. If one is already in progress, it commits it
// and starts a new one.
func (txc *TxConn) Begin(ctx context.Context, session *SafeSession, txAccessModes []sqlparser.TxAccessMode) error {
Expand Down Expand Up @@ -221,11 +229,12 @@ func (txc *TxConn) commit2PC(ctx context.Context, session *SafeSession) (err err
}

var txPhase commitPhase
var startCommitState querypb.StartCommitState
defer func() {
if err == nil {
return
}
txc.errActionAndLogWarn(ctx, session, txPhase, dtid, mmShard, rmShards)
txc.errActionAndLogWarn(ctx, session, txPhase, startCommitState, dtid, mmShard, rmShards)
}()

txPhase = Commit2pcCreateTransaction
Expand Down Expand Up @@ -259,7 +268,7 @@ func (txc *TxConn) commit2PC(ctx context.Context, session *SafeSession) (err err
}

txPhase = Commit2pcStartCommit
_, err = txc.tabletGateway.StartCommit(ctx, mmShard.Target, mmShard.TransactionId, dtid)
startCommitState, err = txc.tabletGateway.StartCommit(ctx, mmShard.Target, mmShard.TransactionId, dtid)
if err != nil {
return err
}
Expand Down Expand Up @@ -298,21 +307,38 @@ func (txc *TxConn) checkValidCondition(session *SafeSession) error {
return nil
}

func (txc *TxConn) errActionAndLogWarn(ctx context.Context, session *SafeSession, txPhase commitPhase, dtid string, mmShard *vtgatepb.Session_ShardSession, rmShards []*vtgatepb.Session_ShardSession) {
func (txc *TxConn) errActionAndLogWarn(
ctx context.Context,
session *SafeSession,
txPhase commitPhase,
startCommitState querypb.StartCommitState,
dtid string,
mmShard *vtgatepb.Session_ShardSession,
rmShards []*vtgatepb.Session_ShardSession,
) {
var rollbackErr error
switch txPhase {
case Commit2pcCreateTransaction:
// Normal rollback is safe because nothing was prepared yet.
if rollbackErr := txc.Rollback(ctx, session); rollbackErr != nil {
log.Warningf("Rollback failed after Create Transaction failure: %v", rollbackErr)
}
rollbackErr = txc.Rollback(ctx, session)
case Commit2pcPrepare:
// Rollback the prepared and unprepared transactions.
if resumeErr := txc.rollbackTx(ctx, dtid, mmShard, rmShards, session.logging); resumeErr != nil {
log.Warningf("Rollback failed after Prepare failure: %v", resumeErr)
rollbackErr = txc.rollbackTx(ctx, dtid, mmShard, rmShards, session.logging)
case Commit2pcStartCommit:
// Failed to store the commit decision on MM.
// If the failure state is certain, then the only option is to rollback the prepared transactions on the RMs.
if startCommitState == querypb.StartCommitState_Fail {
rollbackErr = txc.rollbackTx(ctx, dtid, mmShard, rmShards, session.logging)
}
case Commit2pcStartCommit, Commit2pcPrepareCommit:
fallthrough
case Commit2pcPrepareCommit:
commitUnresolved.Add(1)
}
if rollbackErr != nil {
log.Warningf("Rollback failed after %s failure: %v", phaseMessage[txPhase], rollbackErr)
commitUnresolved.Add(1)
}

session.RecordWarning(&querypb.QueryWarning{
Code: uint32(sqlerror.ERInAtomicRecovery),
Message: createWarningMessage(dtid, txPhase)})
Expand Down
38 changes: 26 additions & 12 deletions go/vt/vtgate/tx_conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -994,9 +994,7 @@ func TestTxConnCommit2PCCreateTransactionFail(t *testing.T) {
sbc0.MustFailCreateTransaction = 1
session.TransactionMode = vtgatepb.TransactionMode_TWOPC
err := sc.txConn.Commit(ctx, session)
want := "error: err"
require.Error(t, err)
assert.Contains(t, err.Error(), want, "Commit")
require.ErrorContains(t, err, "target: TestTxConnCommit2PCCreateTransactionFail.0.primary: error: err")
assert.EqualValues(t, 1, sbc0.CreateTransactionCount.Load(), "sbc0.CreateTransactionCount")
assert.EqualValues(t, 1, sbc0.RollbackCount.Load(), "sbc0.RollbackCount")
assert.EqualValues(t, 1, sbc1.RollbackCount.Load(), "sbc1.RollbackCount")
Expand All @@ -1018,9 +1016,7 @@ func TestTxConnCommit2PCPrepareFail(t *testing.T) {
sbc1.MustFailPrepare = 1
session.TransactionMode = vtgatepb.TransactionMode_TWOPC
err := sc.txConn.Commit(ctx, session)
want := "error: err"
require.Error(t, err)
assert.Contains(t, err.Error(), want, "Commit")
require.ErrorContains(t, err, "target: TestTxConnCommit2PCPrepareFail.1.primary: error: err")
assert.EqualValues(t, 1, sbc0.CreateTransactionCount.Load(), "sbc0.CreateTransactionCount")
assert.EqualValues(t, 1, sbc1.PrepareCount.Load(), "sbc1.PrepareCount")
// Prepared failed on RM, so no commit on MM or RMs.
Expand All @@ -1046,13 +1042,33 @@ func TestTxConnCommit2PCStartCommitFail(t *testing.T) {
sbc0.MustFailStartCommit = 1
session.TransactionMode = vtgatepb.TransactionMode_TWOPC
err := sc.txConn.Commit(ctx, session)
want := "error: err"
require.Error(t, err)
assert.Contains(t, err.Error(), want, "Commit")
require.ErrorContains(t, err, "target: TestTxConnCommit2PCStartCommitFail.0.primary: error: err")
assert.EqualValues(t, 1, sbc0.CreateTransactionCount.Load(), "sbc0.CreateTransactionCount")
assert.EqualValues(t, 1, sbc1.PrepareCount.Load(), "sbc1.PrepareCount")
assert.EqualValues(t, 1, sbc0.StartCommitCount.Load(), "sbc0.StartCommitCount")
assert.EqualValues(t, 0, sbc1.CommitPreparedCount.Load(), "sbc1.CommitPreparedCount")
assert.EqualValues(t, 1, sbc0.SetRollbackCount.Load(), "MM")
assert.EqualValues(t, 1, sbc1.RollbackPreparedCount.Load(), "RM")
assert.EqualValues(t, 1, sbc0.ConcludeTransactionCount.Load(), "sbc0.ConcludeTransactionCount")

sbc0.ResetCounter()
sbc1.ResetCounter()

session = NewSafeSession(&vtgatepb.Session{InTransaction: true})
sc.ExecuteMultiShard(ctx, nil, rss0, queries, session, false, false, nullResultsObserver{})
sc.ExecuteMultiShard(ctx, nil, rss01, twoQueries, session, false, false, nullResultsObserver{})

// Here the StartCommit failure is in uncertain state so rollback is not called and neither conclude.
sbc0.MustFailStartCommitUncertain = 1
session.TransactionMode = vtgatepb.TransactionMode_TWOPC
err = sc.txConn.Commit(ctx, session)
require.ErrorContains(t, err, "target: TestTxConnCommit2PCStartCommitFail.0.primary: uncertain error")
assert.EqualValues(t, 1, sbc0.CreateTransactionCount.Load(), "sbc0.CreateTransactionCount")
assert.EqualValues(t, 1, sbc1.PrepareCount.Load(), "sbc1.PrepareCount")
assert.EqualValues(t, 1, sbc0.StartCommitCount.Load(), "sbc0.StartCommitCount")
assert.EqualValues(t, 0, sbc1.CommitPreparedCount.Load(), "sbc1.CommitPreparedCount")
assert.EqualValues(t, 0, sbc0.SetRollbackCount.Load(), "MM")
assert.EqualValues(t, 0, sbc1.RollbackPreparedCount.Load(), "RM")
assert.EqualValues(t, 0, sbc0.ConcludeTransactionCount.Load(), "sbc0.ConcludeTransactionCount")
}

Expand All @@ -1068,9 +1084,7 @@ func TestTxConnCommit2PCCommitPreparedFail(t *testing.T) {
sbc1.MustFailCommitPrepared = 1
session.TransactionMode = vtgatepb.TransactionMode_TWOPC
err := sc.txConn.Commit(ctx, session)
want := "error: err"
require.Error(t, err)
assert.Contains(t, err.Error(), want, "Commit")
require.ErrorContains(t, err, "target: TestTxConnCommit2PCCommitPreparedFail.1.primary: error: err")
assert.EqualValues(t, 1, sbc0.CreateTransactionCount.Load(), "sbc0.CreateTransactionCount")
assert.EqualValues(t, 1, sbc1.PrepareCount.Load(), "sbc1.PrepareCount")
assert.EqualValues(t, 1, sbc0.StartCommitCount.Load(), "sbc0.StartCommitCount")
Expand Down
40 changes: 33 additions & 7 deletions go/vt/vttablet/sandboxconn/sandboxconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,14 @@ type SandboxConn struct {

// These errors are triggered only for specific functions.
// For now these are just for the 2PC functions.
MustFailPrepare int
MustFailCommitPrepared int
MustFailRollbackPrepared int
MustFailCreateTransaction int
MustFailStartCommit int
MustFailSetRollback int
MustFailConcludeTransaction int
MustFailPrepare int
MustFailCommitPrepared int
MustFailRollbackPrepared int
MustFailCreateTransaction int
MustFailStartCommit int
MustFailStartCommitUncertain int
MustFailSetRollback int
MustFailConcludeTransaction int
// MustFailExecute is keyed by the statement type and stores the number
// of times to fail when it sees that statement type.
// Once, exhausted it will start returning non-error response.
Expand Down Expand Up @@ -157,6 +158,27 @@ func NewSandboxConn(t *topodatapb.Tablet) *SandboxConn {
}
}

// ResetCounter resets the counters in the sandboxconn.
func (sbc *SandboxConn) ResetCounter() {
sbc.ExecCount.Store(0)
sbc.BeginCount.Store(0)
sbc.CommitCount.Store(0)
sbc.RollbackCount.Store(0)
sbc.AsTransactionCount.Store(0)
sbc.PrepareCount.Store(0)
sbc.CommitPreparedCount.Store(0)
sbc.RollbackPreparedCount.Store(0)
sbc.CreateTransactionCount.Store(0)
sbc.StartCommitCount.Store(0)
sbc.SetRollbackCount.Store(0)
sbc.ConcludeTransactionCount.Store(0)
sbc.ReadTransactionCount.Store(0)
sbc.UnresolvedTransactionsCount.Store(0)
sbc.ReserveCount.Store(0)
sbc.ReleaseCount.Store(0)
sbc.GetSchemaCount.Store(0)
}

// RequireQueriesLocking sets the sandboxconn to require locking the access of Queries field.
func (sbc *SandboxConn) RequireQueriesLocking() {
sbc.queriesRequireLocking = true
Expand Down Expand Up @@ -411,6 +433,10 @@ func (sbc *SandboxConn) StartCommit(context.Context, *querypb.Target, int64, str
sbc.MustFailStartCommit--
return querypb.StartCommitState_Fail, vterrors.New(vtrpcpb.Code_FAILED_PRECONDITION, "error: err")
}
if sbc.MustFailStartCommitUncertain > 0 {
sbc.MustFailStartCommitUncertain--
return querypb.StartCommitState_Unknown, vterrors.New(vtrpcpb.Code_FAILED_PRECONDITION, "uncertain error")
}
err = sbc.getError()
if err != nil {
return querypb.StartCommitState_Unknown, err
Expand Down

0 comments on commit cc8add3

Please sign in to comment.