diff --git a/pkg/kv/interface_mock_test.go b/pkg/kv/interface_mock_test.go index d0241b9fd732d..7930806b0b544 100644 --- a/pkg/kv/interface_mock_test.go +++ b/pkg/kv/interface_mock_test.go @@ -75,6 +75,10 @@ func (t *mockTxn) StartTS() uint64 { return uint64(0) } +func (t *mockTxn) CommitTS() uint64 { + return 0 +} + func (t *mockTxn) Get(ctx context.Context, k Key) ([]byte, error) { return nil, nil } diff --git a/pkg/kv/kv.go b/pkg/kv/kv.go index b2ad11cb33dec..bfe3bbd86f5f1 100644 --- a/pkg/kv/kv.go +++ b/pkg/kv/kv.go @@ -251,6 +251,8 @@ type Transaction interface { IsReadOnly() bool // StartTS returns the transaction start timestamp. StartTS() uint64 + // CommitTS returns the transaction commit timestamp. + CommitTS() uint64 // Valid returns if the transaction is valid. // A transaction become invalid after commit or rollback. Valid() bool diff --git a/pkg/session/session.go b/pkg/session/session.go index f381afdab8632..489dd0e2d43d2 100644 --- a/pkg/session/session.go +++ b/pkg/session/session.go @@ -929,6 +929,21 @@ func (s *session) CommitTxn(ctx context.Context) error { s.sessionVars.StmtCtx.MergeExecDetails(nil, commitDetail) } + if err == nil { + // save CommitTS in sessionVars for invariant check + // TODO: enable LastCommitTS with a session variable + if ts := s.txn.CommitTS(); ts > 0 { + if ts <= s.sessionVars.LastCommitTS { + logutil.BgLogger().Fatal("check lastCommitTS failed", + zap.Uint64("lastCommitTS", s.sessionVars.LastCommitTS), + zap.Uint64("CommitTS", ts), + ) + } else { + s.sessionVars.LastCommitTS = ts + } + } + } + // record the TTLInsertRows in the metric metrics.TTLInsertRowsCount.Add(float64(s.sessionVars.TxnCtx.InsertTTLRowsCount)) diff --git a/pkg/session/txn.go b/pkg/session/txn.go index b65a969b28ea0..6ce87293509e0 100644 --- a/pkg/session/txn.go +++ b/pkg/session/txn.go @@ -231,6 +231,13 @@ func (txn *LazyTxn) validOrPending() bool { return txn.txnFuture != nil || txn.Valid() } +func (txn *LazyTxn) CommitTS() uint64 { + if txn.Transaction != nil { + return txn.Transaction.CommitTS() + } + return 0 +} + func (txn *LazyTxn) String() string { if txn.Transaction != nil { return txn.Transaction.String() diff --git a/pkg/sessionctx/variable/session.go b/pkg/sessionctx/variable/session.go index 29d304d133403..87e1219e9729c 100644 --- a/pkg/sessionctx/variable/session.go +++ b/pkg/sessionctx/variable/session.go @@ -888,6 +888,9 @@ type SessionVars struct { // SnapshotTS is used for reading history data. For simplicity, SnapshotTS only supports distsql request. SnapshotTS uint64 + // LastCommitTS is the commit_ts of the last transaction in this session. + LastCommitTS uint64 + // TxnReadTS is used for staleness transaction, it provides next staleness transaction startTS. TxnReadTS *TxnReadTS diff --git a/pkg/sessiontxn/isolation/base.go b/pkg/sessiontxn/isolation/base.go index 29207b1ef42f5..7e932dbdc2301 100644 --- a/pkg/sessiontxn/isolation/base.go +++ b/pkg/sessiontxn/isolation/base.go @@ -34,11 +34,13 @@ import ( "github.com/pingcap/tidb/pkg/store/driver/txn" "github.com/pingcap/tidb/pkg/table/temptable" "github.com/pingcap/tidb/pkg/tablecodec" + "github.com/pingcap/tidb/pkg/util/logutil" "github.com/pingcap/tidb/pkg/util/tableutil" "github.com/pingcap/tidb/pkg/util/tracing" tikvstore "github.com/tikv/client-go/v2/kv" "github.com/tikv/client-go/v2/oracle" "github.com/tikv/client-go/v2/txnkv/transaction" + "go.uber.org/zap" ) // baseTxnContextProvider is a base class for the transaction context providers that implement `TxnContextProvider` in different isolation. @@ -304,6 +306,14 @@ func (p *baseTxnContextProvider) ActivateTxn() (kv.Transaction, error) { sessVars.SetInTxn(true) } + // verify start_ts is later than any previous commit_ts in the session + if sessVars.LastCommitTS > 0 && sessVars.LastCommitTS > sessVars.TxnCtx.StartTS { + logutil.BgLogger().Fatal("check lastCommitTS failed", + zap.Uint64("lastCommitTS", sessVars.LastCommitTS), + zap.Uint64("startTS", sessVars.TxnCtx.StartTS), + ) + } + txn.SetVars(sessVars.KVVars) p.SetOptionsOnTxnActive(txn)