Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

session: track LastCommitTS in SessionVars and check timestamps of later txns are larger #57305

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions pkg/kv/interface_mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,10 @@ func (t *mockTxn) StartTS() uint64 {
return uint64(0)
}

func (t *mockTxn) CommitTS() uint64 {
return 0
}

func (t *mockTxn) Get(ctx context.Context, k Key) ([]byte, error) {
return nil, nil
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,8 @@ type Transaction interface {
IsReadOnly() bool
// StartTS returns the transaction start timestamp.
StartTS() uint64
// CommitTS returns the transaction commit timestamp if it is committed; otherwise it returns 0.
CommitTS() uint64
// Valid returns if the transaction is valid.
// A transaction become invalid after commit or rollback.
Valid() bool
Expand Down
13 changes: 13 additions & 0 deletions pkg/session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -925,6 +925,19 @@ func (s *session) CommitTxn(ctx context.Context) error {
s.sessionVars.StmtCtx.MergeExecDetails(nil, commitDetail)
}

if err == nil && s.txn.lastCommitTS > 0 {
// lastCommitTS could be the same, e.g. when the txn is considered readonly
if s.txn.lastCommitTS < s.sessionVars.LastCommitTS {
logutil.BgLogger().Panic("check lastCommitTS failed",
zap.Uint64("sessionLastCommitTS", s.sessionVars.LastCommitTS),
zap.Uint64("txnLastCommitTS", s.txn.lastCommitTS),
zap.String("sql", redact.String(s.sessionVars.EnableRedactLog, s.sessionVars.StmtCtx.OriginalSQL)),
)
} else {
s.sessionVars.LastCommitTS = s.txn.lastCommitTS
}
}

// record the TTLInsertRows in the metric
metrics.TTLInsertRowsCount.Add(float64(s.sessionVars.TxnCtx.InsertTTLRowsCount))

Expand Down
3 changes: 2 additions & 1 deletion pkg/session/test/txn/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ go_test(
],
flaky = True,
race = "on",
shard_count = 10,
shard_count = 11,
deps = [
"//pkg/config",
"//pkg/kv",
Expand All @@ -22,6 +22,7 @@ go_test(
"//pkg/util/dbterror/plannererrors",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_stretchr_testify//require",
"@com_github_tikv_client_go_v2//oracle",
"@com_github_tikv_client_go_v2//tikv",
"@org_uber_go_goleak//:goleak",
],
Expand Down
14 changes: 14 additions & 0 deletions pkg/session/test/txn/txn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/pingcap/tidb/pkg/testkit"
"github.com/pingcap/tidb/pkg/util/dbterror/plannererrors"
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/oracle"
)

// TestAutocommit . See https://dev.mysql.com/doc/internals/en/status-flags.html
Expand Down Expand Up @@ -509,6 +510,19 @@ func TestInTrans(t *testing.T) {
require.False(t, txn.Valid())
}

func TestCommitTSOrderCheck(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("create table t(id int)")
ts := oracle.GoTimeToTS(time.Now().Add(time.Minute))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/session/mockFutureCommitTS", fmt.Sprintf("return(%d)", ts)))
tk.MustExec("insert into t values(123)")
require.Panics(t, func() {
tk.Exec("select * from t")
})
}

func TestMemBufferSnapshotRead(t *testing.T) {
store := testkit.CreateMockStore(t)

Expand Down
14 changes: 13 additions & 1 deletion pkg/session/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ type LazyTxn struct {

// mark the txn enables lazy uniqueness check in pessimistic transactions.
lazyUniquenessCheckEnabled bool

// commit ts of the last successful transaction, to ensure ordering of TS
lastCommitTS uint64
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This field seems to overlap with the newly added CommitTS in the KV interface. Additionally, LazyTxn is a structure that is no longer needed once a transaction is completed. Or just retain only one of them?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is because kv.Transaction will be set to nil in changeToInvalid in LazyTxn.Commit.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about returning the commitTS of before changeToInvalid, like

- func (s *session) doCommit(ctx context.Context) (error) {
+ func (s *session) doCommit(ctx context.Context) (uint64, error) {
	if !s.txn.Valid() {
		return nil
	}

	defer func() {
        // Return the commitTS if error is nil
		s.txn.changeToInvalid()
		s.sessionVars.SetInTxn(false)
		s.sessionVars.ClearDiskFullOpt()
	}()
   ...
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually the reason is CommitTS has to be saved until the next transaction. If we change doCommit to return CommitTS, we have to,

  • save CommitTS in session
  • check if the transaction is committed successfully in doCommit
    Since we have to save to save CommitTS, it would be simpler to save CommitTS inside doCommit.

Copy link
Contributor

@you06 you06 Dec 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we store commit ts in doCommit?

+func (s *session) doCommit(ctx context.Context) (err error) {
	if !s.txn.Valid() {
		return nil
	}

	defer func() {
+		if err == nil {
+			s.sessionVars.LastCommitTS = s.txn.CommitTS()
+		}
		s.txn.changeToInvalid()
		s.sessionVars.SetInTxn(false)
		s.sessionVars.ClearDiskFullOpt()
	}()
	...
}

}

// GetTableInfo returns the cached index name.
Expand Down Expand Up @@ -431,7 +434,16 @@ func (txn *LazyTxn) Commit(ctx context.Context) error {
}
})

return txn.Transaction.Commit(ctx)
err := txn.Transaction.Commit(ctx)
if err == nil {
txn.lastCommitTS = txn.Transaction.CommitTS()
failpoint.Inject("mockFutureCommitTS", func(val failpoint.Value) {
if ts, ok := val.(int); ok {
txn.lastCommitTS = uint64(ts)
}
})
}
return err
}

// Rollback overrides the Transaction interface.
Expand Down
3 changes: 3 additions & 0 deletions pkg/sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -892,6 +892,9 @@ type SessionVars struct {
// SnapshotTS is used for reading history data. For simplicity, SnapshotTS only supports distsql request.
SnapshotTS uint64

// LastCommitTS is the commit_ts of the last successful transaction in this session.
LastCommitTS uint64

// TxnReadTS is used for staleness transaction, it provides next staleness transaction startTS.
TxnReadTS *TxnReadTS

Expand Down
1 change: 1 addition & 0 deletions pkg/sessiontxn/isolation/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ go_library(
"//pkg/table/temptable",
"//pkg/tablecodec",
"//pkg/util/logutil",
"//pkg/util/redact",
"//pkg/util/tableutil",
"//pkg/util/tracing",
"@com_github_pingcap_errors//:errors",
Expand Down
16 changes: 16 additions & 0 deletions pkg/sessiontxn/isolation/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,14 @@ import (
"github.com/pingcap/tidb/pkg/store/driver/txn"
"github.com/pingcap/tidb/pkg/table/temptable"
"github.com/pingcap/tidb/pkg/tablecodec"
"github.com/pingcap/tidb/pkg/util/logutil"
"github.com/pingcap/tidb/pkg/util/redact"
"github.com/pingcap/tidb/pkg/util/tableutil"
"github.com/pingcap/tidb/pkg/util/tracing"
tikvstore "github.com/tikv/client-go/v2/kv"
"github.com/tikv/client-go/v2/oracle"
"github.com/tikv/client-go/v2/txnkv/transaction"
"go.uber.org/zap"
)

// baseTxnContextProvider is a base class for the transaction context providers that implement `TxnContextProvider` in different isolation.
Expand Down Expand Up @@ -268,6 +271,10 @@ func (p *baseTxnContextProvider) getTxnStartTS() (uint64, error) {
return txn.StartTS(), nil
}

func (p *baseTxnContextProvider) usePresetStartTS() bool {
return p.constStartTS != 0 || p.sctx.GetSessionVars().SnapshotTS != 0
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure about this: SnapshotTS != 0 should imply a staleness txn ctx provider? It won't hurt though

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure either. That's why I named it Preset. The point is we don't check StartTS if it is preset.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ekexium @b6g

Here, considering various historical features (e.g., tidb_snapshot, stale read, etc.), careful attention is required to ensure accurate and comprehensive condition checks. Any oversight could lead to unexpected tidb-server panics.

An alternative approach is to reverse the logic—confirming that the current transaction is activated by a PD-allocated timestamp before proceeding with the check. This seems to be a more reliable method.

like

- if !usePresetStartTS
+ if activiatedByPDAllocatedTS

}

// ActivateTxn activates the transaction and set the relevant context variables.
func (p *baseTxnContextProvider) ActivateTxn() (kv.Transaction, error) {
if p.txn != nil {
Expand Down Expand Up @@ -304,6 +311,15 @@ func (p *baseTxnContextProvider) ActivateTxn() (kv.Transaction, error) {
sessVars.SetInTxn(true)
}

// verify start_ts is later than any previous commit_ts in the session
dveeden marked this conversation as resolved.
Show resolved Hide resolved
if !p.usePresetStartTS() && sessVars.LastCommitTS > 0 && sessVars.LastCommitTS > sessVars.TxnCtx.StartTS {
logutil.BgLogger().Panic("check session lastCommitTS failed",
zap.Uint64("lastCommitTS", sessVars.LastCommitTS),
zap.Uint64("startTS", sessVars.TxnCtx.StartTS),
zap.String("sql", redact.String(sessVars.EnableRedactLog, sessVars.StmtCtx.OriginalSQL)),
)
}

txn.SetVars(sessVars.KVVars)

p.SetOptionsOnTxnActive(txn)
Expand Down