From 55b7273baaf2d68252793500580937a1dd9470cb Mon Sep 17 00:00:00 2001 From: Bin Zhang Date: Mon, 18 Nov 2024 16:36:04 -0800 Subject: [PATCH 1/7] session: track LastCommitTS in SessionVars and check StartTS of a txn is larger --- pkg/kv/interface_mock_test.go | 4 ++++ pkg/kv/kv.go | 2 ++ pkg/session/session.go | 13 +++++++++++++ pkg/session/txn.go | 9 ++++++++- pkg/sessionctx/variable/session.go | 3 +++ pkg/sessiontxn/isolation/base.go | 11 +++++++++++ 6 files changed, 41 insertions(+), 1 deletion(-) diff --git a/pkg/kv/interface_mock_test.go b/pkg/kv/interface_mock_test.go index fea4bd2b1f7bc..5b56ee8361a0b 100644 --- a/pkg/kv/interface_mock_test.go +++ b/pkg/kv/interface_mock_test.go @@ -75,6 +75,10 @@ func (t *mockTxn) StartTS() uint64 { return uint64(0) } +func (t *mockTxn) CommitTS() uint64 { + return 0 +} + func (t *mockTxn) Get(ctx context.Context, k Key) ([]byte, error) { return nil, nil } diff --git a/pkg/kv/kv.go b/pkg/kv/kv.go index 92255dbabf822..8ad07be825c07 100644 --- a/pkg/kv/kv.go +++ b/pkg/kv/kv.go @@ -251,6 +251,8 @@ type Transaction interface { IsReadOnly() bool // StartTS returns the transaction start timestamp. StartTS() uint64 + // CommitTS returns the transaction commit timestamp. + CommitTS() uint64 // Valid returns if the transaction is valid. // A transaction become invalid after commit or rollback. Valid() bool diff --git a/pkg/session/session.go b/pkg/session/session.go index e3770a7772d97..d49a4576acb15 100644 --- a/pkg/session/session.go +++ b/pkg/session/session.go @@ -927,6 +927,19 @@ func (s *session) CommitTxn(ctx context.Context) error { s.sessionVars.StmtCtx.MergeExecDetails(nil, commitDetail) } + if err == nil && s.txn.lastCommitTS > 0 { + // lastCommitTS could be the same, e.g. when the txn is considered readonly + if s.txn.lastCommitTS < s.sessionVars.LastCommitTS { + logutil.BgLogger().Fatal("check lastCommitTS failed", + zap.Uint64("sessionLastCommitTS", s.sessionVars.LastCommitTS), + zap.Uint64("txnLastCommitTS", s.txn.lastCommitTS), + zap.String("sql", s.sessionVars.StmtCtx.OriginalSQL), + ) + } else { + s.sessionVars.LastCommitTS = s.txn.lastCommitTS + } + } + // record the TTLInsertRows in the metric metrics.TTLInsertRowsCount.Add(float64(s.sessionVars.TxnCtx.InsertTTLRowsCount)) diff --git a/pkg/session/txn.go b/pkg/session/txn.go index cc65134611b00..ec97b13e530b9 100644 --- a/pkg/session/txn.go +++ b/pkg/session/txn.go @@ -70,6 +70,9 @@ type LazyTxn struct { // mark the txn enables lazy uniqueness check in pessimistic transactions. lazyUniquenessCheckEnabled bool + + // commit ts of the last successful transaction, to ensure ordering of TS + lastCommitTS uint64 } // GetTableInfo returns the cached index name. @@ -431,7 +434,11 @@ func (txn *LazyTxn) Commit(ctx context.Context) error { } }) - return txn.Transaction.Commit(ctx) + err := txn.Transaction.Commit(ctx) + if err == nil { + txn.lastCommitTS = txn.Transaction.CommitTS() + } + return err } // Rollback overrides the Transaction interface. diff --git a/pkg/sessionctx/variable/session.go b/pkg/sessionctx/variable/session.go index 6863d9b838dea..d6500986bc4cb 100644 --- a/pkg/sessionctx/variable/session.go +++ b/pkg/sessionctx/variable/session.go @@ -866,6 +866,9 @@ type SessionVars struct { // SnapshotTS is used for reading history data. For simplicity, SnapshotTS only supports distsql request. SnapshotTS uint64 + // LastCommitTS is the commit_ts of the last successful transaction in this session. + LastCommitTS uint64 + // TxnReadTS is used for staleness transaction, it provides next staleness transaction startTS. TxnReadTS *TxnReadTS diff --git a/pkg/sessiontxn/isolation/base.go b/pkg/sessiontxn/isolation/base.go index b3a2c6dd42dcc..dfc71def4bf10 100644 --- a/pkg/sessiontxn/isolation/base.go +++ b/pkg/sessiontxn/isolation/base.go @@ -35,11 +35,13 @@ import ( "github.com/pingcap/tidb/pkg/store/driver/txn" "github.com/pingcap/tidb/pkg/table/temptable" "github.com/pingcap/tidb/pkg/tablecodec" + "github.com/pingcap/tidb/pkg/util/logutil" "github.com/pingcap/tidb/pkg/util/tableutil" "github.com/pingcap/tidb/pkg/util/tracing" tikvstore "github.com/tikv/client-go/v2/kv" "github.com/tikv/client-go/v2/oracle" "github.com/tikv/client-go/v2/txnkv/transaction" + "go.uber.org/zap" ) // baseTxnContextProvider is a base class for the transaction context providers that implement `TxnContextProvider` in different isolation. @@ -305,6 +307,15 @@ func (p *baseTxnContextProvider) ActivateTxn() (kv.Transaction, error) { sessVars.SetInTxn(true) } + // verify start_ts is later than any previous commit_ts in the session + if sessVars.LastCommitTS > 0 && sessVars.LastCommitTS > sessVars.TxnCtx.StartTS { + logutil.BgLogger().Fatal("check session lastCommitTS failed", + zap.Uint64("lastCommitTS", sessVars.LastCommitTS), + zap.Uint64("startTS", sessVars.TxnCtx.StartTS), + zap.String("sql", sessVars.StmtCtx.OriginalSQL), + ) + } + txn.SetVars(sessVars.KVVars) p.SetOptionsOnTxnActive(txn) From b840c7376fecb7da98bfd23eebb418524574aedb Mon Sep 17 00:00:00 2001 From: Bin Zhang Date: Tue, 26 Nov 2024 23:52:53 -0800 Subject: [PATCH 2/7] test --- pkg/session/session.go | 2 +- pkg/session/test/txn/txn_test.go | 14 ++++++++++++++ pkg/session/txn.go | 5 +++++ pkg/sessiontxn/isolation/base.go | 2 +- 4 files changed, 21 insertions(+), 2 deletions(-) diff --git a/pkg/session/session.go b/pkg/session/session.go index d49a4576acb15..835b1976e021a 100644 --- a/pkg/session/session.go +++ b/pkg/session/session.go @@ -930,7 +930,7 @@ func (s *session) CommitTxn(ctx context.Context) error { if err == nil && s.txn.lastCommitTS > 0 { // lastCommitTS could be the same, e.g. when the txn is considered readonly if s.txn.lastCommitTS < s.sessionVars.LastCommitTS { - logutil.BgLogger().Fatal("check lastCommitTS failed", + logutil.BgLogger().Panic("check lastCommitTS failed", zap.Uint64("sessionLastCommitTS", s.sessionVars.LastCommitTS), zap.Uint64("txnLastCommitTS", s.txn.lastCommitTS), zap.String("sql", s.sessionVars.StmtCtx.OriginalSQL), diff --git a/pkg/session/test/txn/txn_test.go b/pkg/session/test/txn/txn_test.go index 9a40530c76427..2d1a24aff1fae 100644 --- a/pkg/session/test/txn/txn_test.go +++ b/pkg/session/test/txn/txn_test.go @@ -31,6 +31,7 @@ import ( "github.com/pingcap/tidb/pkg/testkit" "github.com/pingcap/tidb/pkg/util/dbterror/plannererrors" "github.com/stretchr/testify/require" + "github.com/tikv/client-go/v2/oracle" ) // TestAutocommit . See https://dev.mysql.com/doc/internals/en/status-flags.html @@ -509,6 +510,19 @@ func TestInTrans(t *testing.T) { require.False(t, txn.Valid()) } +func TestCommitTSOrderCheck(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("create table t(id int)") + ts := oracle.GoTimeToTS(time.Now().Add(time.Minute)) + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/session/mockFutureCommitTS", fmt.Sprintf("return(%d)", ts))) + tk.MustExec("insert into t values(123)") + require.Panics(t, func() { + tk.Exec("select * from t") + }) +} + func TestMemBufferSnapshotRead(t *testing.T) { store := testkit.CreateMockStore(t) diff --git a/pkg/session/txn.go b/pkg/session/txn.go index ec97b13e530b9..b311030c497ac 100644 --- a/pkg/session/txn.go +++ b/pkg/session/txn.go @@ -437,6 +437,11 @@ func (txn *LazyTxn) Commit(ctx context.Context) error { err := txn.Transaction.Commit(ctx) if err == nil { txn.lastCommitTS = txn.Transaction.CommitTS() + failpoint.Inject("mockFutureCommitTS", func(val failpoint.Value) { + if ts, ok := val.(int); ok { + txn.lastCommitTS = uint64(ts) + } + }) } return err } diff --git a/pkg/sessiontxn/isolation/base.go b/pkg/sessiontxn/isolation/base.go index dfc71def4bf10..aaf4a7338cdf1 100644 --- a/pkg/sessiontxn/isolation/base.go +++ b/pkg/sessiontxn/isolation/base.go @@ -309,7 +309,7 @@ func (p *baseTxnContextProvider) ActivateTxn() (kv.Transaction, error) { // verify start_ts is later than any previous commit_ts in the session if sessVars.LastCommitTS > 0 && sessVars.LastCommitTS > sessVars.TxnCtx.StartTS { - logutil.BgLogger().Fatal("check session lastCommitTS failed", + logutil.BgLogger().Panic("check session lastCommitTS failed", zap.Uint64("lastCommitTS", sessVars.LastCommitTS), zap.Uint64("startTS", sessVars.TxnCtx.StartTS), zap.String("sql", sessVars.StmtCtx.OriginalSQL), From 32b51c32984d2c077ab38d731c70d8836fd8ef96 Mon Sep 17 00:00:00 2001 From: Bin Zhang Date: Sat, 7 Dec 2024 08:26:44 -0800 Subject: [PATCH 3/7] dont check startts if it is preset --- pkg/sessiontxn/isolation/base.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/pkg/sessiontxn/isolation/base.go b/pkg/sessiontxn/isolation/base.go index aaf4a7338cdf1..36529bd5a1066 100644 --- a/pkg/sessiontxn/isolation/base.go +++ b/pkg/sessiontxn/isolation/base.go @@ -271,6 +271,10 @@ func (p *baseTxnContextProvider) getTxnStartTS() (uint64, error) { return txn.StartTS(), nil } +func (p *baseTxnContextProvider) usePresetStartTS() bool { + return p.constStartTS != 0 || p.sctx.GetSessionVars().SnapshotTS != 0 +} + // ActivateTxn activates the transaction and set the relevant context variables. func (p *baseTxnContextProvider) ActivateTxn() (kv.Transaction, error) { if p.txn != nil { @@ -308,7 +312,7 @@ func (p *baseTxnContextProvider) ActivateTxn() (kv.Transaction, error) { } // verify start_ts is later than any previous commit_ts in the session - if sessVars.LastCommitTS > 0 && sessVars.LastCommitTS > sessVars.TxnCtx.StartTS { + if !p.usePresetStartTS() && sessVars.LastCommitTS > 0 && sessVars.LastCommitTS > sessVars.TxnCtx.StartTS { logutil.BgLogger().Panic("check session lastCommitTS failed", zap.Uint64("lastCommitTS", sessVars.LastCommitTS), zap.Uint64("startTS", sessVars.TxnCtx.StartTS), From 564e3ba5e714c5753a222887f0a17926a53e7835 Mon Sep 17 00:00:00 2001 From: Bin Zhang Date: Fri, 20 Dec 2024 13:54:26 -0800 Subject: [PATCH 4/7] bazel_prepare --- pkg/session/test/txn/BUILD.bazel | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pkg/session/test/txn/BUILD.bazel b/pkg/session/test/txn/BUILD.bazel index 456c700f3123a..c2de0240d378b 100644 --- a/pkg/session/test/txn/BUILD.bazel +++ b/pkg/session/test/txn/BUILD.bazel @@ -9,7 +9,7 @@ go_test( ], flaky = True, race = "on", - shard_count = 10, + shard_count = 11, deps = [ "//pkg/config", "//pkg/kv", @@ -22,6 +22,7 @@ go_test( "//pkg/util/dbterror/plannererrors", "@com_github_pingcap_failpoint//:failpoint", "@com_github_stretchr_testify//require", + "@com_github_tikv_client_go_v2//oracle", "@com_github_tikv_client_go_v2//tikv", "@org_uber_go_goleak//:goleak", ], From 81645fac9d6989efd87d0addd5fdf3117bba0603 Mon Sep 17 00:00:00 2001 From: Bin Zhang Date: Mon, 23 Dec 2024 21:23:21 -0800 Subject: [PATCH 5/7] redact sql string --- pkg/kv/kv.go | 2 +- pkg/session/session.go | 2 +- pkg/sessiontxn/isolation/base.go | 3 ++- 3 files changed, 4 insertions(+), 3 deletions(-) diff --git a/pkg/kv/kv.go b/pkg/kv/kv.go index 8ad07be825c07..acbfaf84e4538 100644 --- a/pkg/kv/kv.go +++ b/pkg/kv/kv.go @@ -251,7 +251,7 @@ type Transaction interface { IsReadOnly() bool // StartTS returns the transaction start timestamp. StartTS() uint64 - // CommitTS returns the transaction commit timestamp. + // CommitTS returns the transaction commit timestamp if it is committed; otherwise it returns 0. CommitTS() uint64 // Valid returns if the transaction is valid. // A transaction become invalid after commit or rollback. diff --git a/pkg/session/session.go b/pkg/session/session.go index 835b1976e021a..38ac8de7ccdc8 100644 --- a/pkg/session/session.go +++ b/pkg/session/session.go @@ -933,7 +933,7 @@ func (s *session) CommitTxn(ctx context.Context) error { logutil.BgLogger().Panic("check lastCommitTS failed", zap.Uint64("sessionLastCommitTS", s.sessionVars.LastCommitTS), zap.Uint64("txnLastCommitTS", s.txn.lastCommitTS), - zap.String("sql", s.sessionVars.StmtCtx.OriginalSQL), + zap.String("sql", redact.String(s.sessionVars.EnableRedactLog, s.sessionVars.StmtCtx.OriginalSQL)), ) } else { s.sessionVars.LastCommitTS = s.txn.lastCommitTS diff --git a/pkg/sessiontxn/isolation/base.go b/pkg/sessiontxn/isolation/base.go index 36529bd5a1066..93754d5c490c7 100644 --- a/pkg/sessiontxn/isolation/base.go +++ b/pkg/sessiontxn/isolation/base.go @@ -36,6 +36,7 @@ import ( "github.com/pingcap/tidb/pkg/table/temptable" "github.com/pingcap/tidb/pkg/tablecodec" "github.com/pingcap/tidb/pkg/util/logutil" + "github.com/pingcap/tidb/pkg/util/redact" "github.com/pingcap/tidb/pkg/util/tableutil" "github.com/pingcap/tidb/pkg/util/tracing" tikvstore "github.com/tikv/client-go/v2/kv" @@ -316,7 +317,7 @@ func (p *baseTxnContextProvider) ActivateTxn() (kv.Transaction, error) { logutil.BgLogger().Panic("check session lastCommitTS failed", zap.Uint64("lastCommitTS", sessVars.LastCommitTS), zap.Uint64("startTS", sessVars.TxnCtx.StartTS), - zap.String("sql", sessVars.StmtCtx.OriginalSQL), + zap.String("sql", redact.String(sessVars.EnableRedactLog, sessVars.StmtCtx.OriginalSQL)), ) } From aa03bfd0d490437402d6a22591c59904627b30fd Mon Sep 17 00:00:00 2001 From: Bin Zhang Date: Tue, 24 Dec 2024 15:57:23 -0800 Subject: [PATCH 6/7] bazel_prepare --- pkg/sessiontxn/isolation/BUILD.bazel | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/sessiontxn/isolation/BUILD.bazel b/pkg/sessiontxn/isolation/BUILD.bazel index dc5c8dc08da73..04d9439572a0b 100644 --- a/pkg/sessiontxn/isolation/BUILD.bazel +++ b/pkg/sessiontxn/isolation/BUILD.bazel @@ -32,6 +32,7 @@ go_library( "//pkg/table/temptable", "//pkg/tablecodec", "//pkg/util/logutil", + "//pkg/util/redact", "//pkg/util/tableutil", "//pkg/util/tracing", "@com_github_pingcap_errors//:errors", From c5e3c41b3d0b8e5e7834e74c6bd45980621f87f2 Mon Sep 17 00:00:00 2001 From: Bin Zhang Date: Sat, 25 Jan 2025 14:02:31 -0800 Subject: [PATCH 7/7] return ErrAssertionFailed --- pkg/session/session.go | 3 ++- pkg/session/test/txn/txn_test.go | 5 ++--- pkg/sessiontxn/isolation/base.go | 3 ++- 3 files changed, 6 insertions(+), 5 deletions(-) diff --git a/pkg/session/session.go b/pkg/session/session.go index 38ac8de7ccdc8..83af3482e2dbd 100644 --- a/pkg/session/session.go +++ b/pkg/session/session.go @@ -930,11 +930,12 @@ func (s *session) CommitTxn(ctx context.Context) error { if err == nil && s.txn.lastCommitTS > 0 { // lastCommitTS could be the same, e.g. when the txn is considered readonly if s.txn.lastCommitTS < s.sessionVars.LastCommitTS { - logutil.BgLogger().Panic("check lastCommitTS failed", + logutil.BgLogger().Error("check lastCommitTS failed", zap.Uint64("sessionLastCommitTS", s.sessionVars.LastCommitTS), zap.Uint64("txnLastCommitTS", s.txn.lastCommitTS), zap.String("sql", redact.String(s.sessionVars.EnableRedactLog, s.sessionVars.StmtCtx.OriginalSQL)), ) + return kv.ErrAssertionFailed } else { s.sessionVars.LastCommitTS = s.txn.lastCommitTS } diff --git a/pkg/session/test/txn/txn_test.go b/pkg/session/test/txn/txn_test.go index 2d1a24aff1fae..bc80b10143424 100644 --- a/pkg/session/test/txn/txn_test.go +++ b/pkg/session/test/txn/txn_test.go @@ -518,9 +518,8 @@ func TestCommitTSOrderCheck(t *testing.T) { ts := oracle.GoTimeToTS(time.Now().Add(time.Minute)) require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/session/mockFutureCommitTS", fmt.Sprintf("return(%d)", ts))) tk.MustExec("insert into t values(123)") - require.Panics(t, func() { - tk.Exec("select * from t") - }) + _, err := tk.Exec("select * from t") + require.True(t, kv.ErrAssertionFailed.Equal(err)) } func TestMemBufferSnapshotRead(t *testing.T) { diff --git a/pkg/sessiontxn/isolation/base.go b/pkg/sessiontxn/isolation/base.go index 93754d5c490c7..1362d83611b63 100644 --- a/pkg/sessiontxn/isolation/base.go +++ b/pkg/sessiontxn/isolation/base.go @@ -314,11 +314,12 @@ func (p *baseTxnContextProvider) ActivateTxn() (kv.Transaction, error) { // verify start_ts is later than any previous commit_ts in the session if !p.usePresetStartTS() && sessVars.LastCommitTS > 0 && sessVars.LastCommitTS > sessVars.TxnCtx.StartTS { - logutil.BgLogger().Panic("check session lastCommitTS failed", + logutil.BgLogger().Error("check session lastCommitTS failed", zap.Uint64("lastCommitTS", sessVars.LastCommitTS), zap.Uint64("startTS", sessVars.TxnCtx.StartTS), zap.String("sql", redact.String(sessVars.EnableRedactLog, sessVars.StmtCtx.OriginalSQL)), ) + return nil, kv.ErrAssertionFailed } txn.SetVars(sessVars.KVVars)