Skip to content

Commit

Permalink
entry (ticdc): speed up schema snapshot initialization (#11217) (#11437)
Browse files Browse the repository at this point in the history
close #11207
  • Loading branch information
ti-chi-bot authored Aug 5, 2024
1 parent 4ef1810 commit a3aa5c0
Show file tree
Hide file tree
Showing 6 changed files with 77 additions and 43 deletions.
66 changes: 37 additions & 29 deletions cdc/entry/schema/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"sync"
"time"

"github.com/goccy/go-json"
"github.com/google/btree"
"github.com/pingcap/errors"
"github.com/pingcap/log"
Expand All @@ -31,6 +32,10 @@ import (
"go.uber.org/zap"
)

const (
mTablePrefix = "Table"
)

// Snapshot stores the source TiDB all schema information.
// If no special comments, all public methods are thread-safe.
type Snapshot struct {
Expand Down Expand Up @@ -122,8 +127,8 @@ func GetSchemaVersion(meta *timeta.Meta) (int64, error) {
return version, nil
}

// NewSingleSnapshotFromMeta creates a new single schema snapshot from a tidb meta
func NewSingleSnapshotFromMeta(
// NewSnapshotFromMeta creates a schema snapshot from meta.
func NewSnapshotFromMeta(
id model.ChangeFeedID,
meta *timeta.Meta,
currentTs uint64,
Expand All @@ -136,28 +141,19 @@ func NewSingleSnapshotFromMeta(
snap.inner.currentTs = currentTs
return snap, nil
}
return NewSnapshotFromMeta(id, meta, currentTs, forceReplicate, filter)
}

// NewSnapshotFromMeta creates a schema snapshot from meta.
func NewSnapshotFromMeta(
id model.ChangeFeedID,
meta *timeta.Meta,
currentTs uint64,
forceReplicate bool,
filter filter.Filter,
) (*Snapshot, error) {
start := time.Now()
snap := NewEmptySnapshot(forceReplicate)
dbinfos, err := meta.ListDatabases()
if err != nil {
return nil, cerror.WrapError(cerror.ErrMetaListDatabases, err)
}
tableCount := 0
// `tag` is used to reverse sort all versions in the generated snapshot.
tag := negative(currentTs)
for _, dbinfo := range dbinfos {
if filter.ShouldIgnoreSchema(dbinfo.Name.O) {
log.Debug("ignore database", zap.String("db", dbinfo.Name.O))
log.Debug("ignore database", zap.Stringer("db", dbinfo.Name), zap.Stringer("changefeed", id))
continue
}
vid := newVersionedID(dbinfo.ID, tag)
Expand All @@ -167,31 +163,41 @@ func NewSnapshotFromMeta(
vname := newVersionedEntityName(-1, dbinfo.Name.O, tag) // -1 means the entity is a schema.
vname.target = dbinfo.ID
snap.inner.schemaNameToID.ReplaceOrInsert(vname)
// get all tables Name
tableNames, err := meta.ListSimpleTables(dbinfo.ID)

rawTables, err := meta.GetMetasByDBID(dbinfo.ID)
if err != nil {
return nil, cerror.WrapError(cerror.ErrMetaListDatabases, err)
}
tableNeeded := make([]*timodel.TableNameInfo, 0, len(tableNames))
// filter tables
for _, table := range tableNames {
if filter.ShouldIgnoreTable(dbinfo.Name.O, table.Name.O) {
log.Debug("ignore table", zap.String("table", table.Name.O))
tableInfos := make([]*timodel.TableInfo, 0, len(rawTables)/2)
for _, r := range rawTables {
tableKey := string(r.Field)
if !strings.HasPrefix(tableKey, mTablePrefix) {
continue
}
tableNeeded = append(tableNeeded, table)
}
tableInfos := make([]*timodel.TableInfo, 0, len(tableNeeded))
for _, table := range tableNeeded {
tableInfo, err := meta.GetTable(dbinfo.ID, table.ID)

tbName := &timodel.TableNameInfo{}
err := json.Unmarshal(r.Value, tbName)
if err != nil {
return nil, errors.Trace(err)
}

if filter.ShouldIgnoreTable(dbinfo.Name.O, tbName.Name.O) {
log.Debug("ignore table", zap.String("db", dbinfo.Name.O),
zap.String("table", tbName.Name.O))
continue
}

tbInfo := &timodel.TableInfo{}
err = json.Unmarshal(r.Value, tbInfo)
if err != nil {
return nil, errors.Trace(err)
}
tableInfos = append(tableInfos, tableInfo)
tableInfos = append(tableInfos, tbInfo)
}

for _, tableInfo := range tableInfos {
tableInfo := model.WrapTableInfo(dbinfo.ID, dbinfo.Name.O, currentTs, tableInfo)
tableCount++
snap.inner.tables.ReplaceOrInsert(versionedID{
id: tableInfo.ID,
tag: tag,
Expand All @@ -204,25 +210,27 @@ func NewSnapshotFromMeta(
target: tableInfo.ID,
})

ineligible := !tableInfo.IsEligible(forceReplicate)
if ineligible {
eligible := tableInfo.IsEligible(forceReplicate)
if !eligible {
snap.inner.ineligibleTables.ReplaceOrInsert(versionedID{id: tableInfo.ID, tag: tag})
}
if pi := tableInfo.GetPartitionInfo(); pi != nil {
for _, partition := range pi.Definitions {
vid := newVersionedID(partition.ID, tag)
vid.target = tableInfo
snap.inner.partitions.ReplaceOrInsert(vid)
if ineligible {
if !eligible {
snap.inner.ineligibleTables.ReplaceOrInsert(versionedID{id: partition.ID, tag: tag})
}
}
}
}
}

snap.inner.currentTs = currentTs
log.Info("schema snapshot created",
zap.Stringer("changefeed", id),
zap.Int("tables", tableCount),
zap.Uint64("currentTs", currentTs),
zap.Any("duration", time.Since(start).Seconds()))
return snap, nil
Expand Down
44 changes: 35 additions & 9 deletions cdc/entry/schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,42 +92,68 @@ func TestAllPhysicalTables(t *testing.T) {
require.Equal(t, tableIDs, expectedTableIDs)
}

func TestAllTables(t *testing.T) {
func TestNewSchemaStorage(t *testing.T) {
helper := NewSchemaTestHelper(t)
defer helper.Close()
cfg := config.GetDefaultReplicaConfig()
cfg.Filter.Rules = []string{"test.t1"}
f, err := filter.NewFilter(cfg, "")
require.Nil(t, err)

// add table before create schema storage
job1 := helper.DDL2Job("create table test.t1 (id int primary key)")
helper.DDL2Job("create table test.t2 (id int primary key)")
ver, err := helper.Storage().CurrentVersion(oracle.GlobalTxnScope)
require.Nil(t, err)
schema, err := NewSchemaStorage(helper.Storage(), ver.Ver,
false, dummyChangeFeedID, util.RoleTester, f)
require.Nil(t, err)
require.NotNil(t, schema)
tableInfos, err := schema.AllTables(context.Background(), ver.Ver)
require.Nil(t, err)
require.Len(t, tableInfos, 1)
require.Equal(t, job1.BinlogInfo.TableInfo.Name.O, tableInfos[0].TableName.Table)
}

func TestAllTables(t *testing.T) {
helper := NewSchemaTestHelper(t)
defer helper.Close()
f, err := filter.NewFilter(config.GetDefaultReplicaConfig(), "")
require.Nil(t, err)
// add table before create schema storage
job := helper.DDL2Job("create table test.dongment (id int primary key)")
ver, err := helper.Storage().CurrentVersion(oracle.GlobalTxnScope)
require.Nil(t, err)
schema, err := NewSchemaStorage(helper.Storage(), ver.Ver,
false, dummyChangeFeedID, util.RoleTester, f)
require.Nil(t, err)
tableInfos, err := schema.AllTables(context.Background(), ver.Ver)
require.Nil(t, err)
require.Len(t, tableInfos, 0)
require.Len(t, tableInfos, 1)
require.Equal(t, job.BinlogInfo.TableInfo.Name.O, tableInfos[0].TableName.Table)
// add normal table
job := helper.DDL2Job("create table test.t1(id int primary key)")
job = helper.DDL2Job("create table test.t1(id int primary key)")
require.Nil(t, schema.HandleDDLJob(job))
tableInfos, err = schema.AllTables(context.Background(), job.BinlogInfo.FinishedTS)
require.Nil(t, err)
require.Len(t, tableInfos, 1)
tableName := tableInfos[0].TableName
require.Len(t, tableInfos, 2)
tableName := tableInfos[1].TableName
require.Equal(t, model.TableName{
Schema: "test",
Table: "t1",
TableID: 104,
TableID: 106,
}, tableName)
// add ineligible table
job = helper.DDL2Job("create table test.t2(id int)")
require.Nil(t, schema.HandleDDLJob(job))
tableInfos, err = schema.AllTables(context.Background(), job.BinlogInfo.FinishedTS)
require.Nil(t, err)
require.Len(t, tableInfos, 1)
tableName = tableInfos[0].TableName
require.Len(t, tableInfos, 2)
tableName = tableInfos[1].TableName
require.Equal(t, model.TableName{
Schema: "test",
Table: "t1",
TableID: 104,
TableID: 106,
}, tableName)
}

Expand Down
2 changes: 1 addition & 1 deletion cdc/entry/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func VerifyTables(
err error,
) {
meta := kv.GetSnapshotMeta(storage, startTs)
snap, err := schema.NewSingleSnapshotFromMeta(
snap, err := schema.NewSnapshotFromMeta(
model.ChangeFeedID4Test("api", "verifyTable"),
meta, startTs, false /* explicitTables */, f)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ require (
github.com/pingcap/failpoint v0.0.0-20240528011301-b51a646c7c86
github.com/pingcap/kvproto v0.0.0-20240227073058-929ab83f9754
github.com/pingcap/log v1.1.1-0.20240314023424-862ccc32f18d
github.com/pingcap/tidb v1.1.0-beta.0.20240801132827-888a58b0064b
github.com/pingcap/tidb v1.1.0-beta.0.20240802042051-abb56db84e19
github.com/pingcap/tidb-dashboard v0.0.0-20240326110213-9768844ff5d7
github.com/pingcap/tidb-tools v0.0.0-20240305021104-9f9bea84490b
github.com/pingcap/tidb/pkg/parser v0.0.0-20240801132827-888a58b0064b
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -815,8 +815,8 @@ github.com/pingcap/log v1.1.1-0.20240314023424-862ccc32f18d h1:y3EueKVfVykdpTyfU
github.com/pingcap/log v1.1.1-0.20240314023424-862ccc32f18d/go.mod h1:ORfBOFp1eteu2odzsyaxI+b8TzJwgjwyQcGhI+9SfEA=
github.com/pingcap/sysutil v1.0.1-0.20240311050922-ae81ee01f3a5 h1:T4pXRhBflzDeAhmOQHNPRRogMYxP13V7BkYw3ZsoSfE=
github.com/pingcap/sysutil v1.0.1-0.20240311050922-ae81ee01f3a5/go.mod h1:rlimy0GcTvjiJqvD5mXTRr8O2eNZPBrcUgiWVYp9530=
github.com/pingcap/tidb v1.1.0-beta.0.20240801132827-888a58b0064b h1:SxypIc3So2jYPlrq1GaD8/grclanY25D/nytYqV1BYw=
github.com/pingcap/tidb v1.1.0-beta.0.20240801132827-888a58b0064b/go.mod h1:1b7fS/qABhsFxohbeF5vR1V7BuWqWMlAPXIi41TX3/Q=
github.com/pingcap/tidb v1.1.0-beta.0.20240802042051-abb56db84e19 h1:wy7GKUrK4G7vTcA7vEUmJRk0tkuj81PBnZuRzRIvj20=
github.com/pingcap/tidb v1.1.0-beta.0.20240802042051-abb56db84e19/go.mod h1:1b7fS/qABhsFxohbeF5vR1V7BuWqWMlAPXIi41TX3/Q=
github.com/pingcap/tidb-dashboard v0.0.0-20240326110213-9768844ff5d7 h1:eFu98FbfJB7PKWOtkaV6YNXXJWqDhczQX56j/iucgU4=
github.com/pingcap/tidb-dashboard v0.0.0-20240326110213-9768844ff5d7/go.mod h1:ucZBRz52icb23T/5Z4CsuUHmarYiin7p2MeiVBe+o8c=
github.com/pingcap/tidb-tools v0.0.0-20240305021104-9f9bea84490b h1:UC4lLT2OBHGImFJbiXQfIxIck7AoFLIiRJ6Eo6uFVaw=
Expand Down
2 changes: 1 addition & 1 deletion tests/integration_tests/http_api_tls/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ function run() {
# wait for above sql done in the up source
sleep 2

check_table_exists test.simple1 ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}
check_table_exists test.verify_table_eligible ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}

sequential_cases=(
"list_changefeed"
Expand Down

0 comments on commit a3aa5c0

Please sign in to comment.