From a3aa5c05d0928e154349826b2b628570d8f4f9df Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Mon, 5 Aug 2024 11:42:38 +0800 Subject: [PATCH] entry (ticdc): speed up schema snapshot initialization (#11217) (#11437) close pingcap/tiflow#11207 --- cdc/entry/schema/snapshot.go | 66 ++++++++++++--------- cdc/entry/schema_test.go | 44 +++++++++++--- cdc/entry/validator.go | 2 +- go.mod | 2 +- go.sum | 4 +- tests/integration_tests/http_api_tls/run.sh | 2 +- 6 files changed, 77 insertions(+), 43 deletions(-) diff --git a/cdc/entry/schema/snapshot.go b/cdc/entry/schema/snapshot.go index e01821ef551..71486e1442a 100644 --- a/cdc/entry/schema/snapshot.go +++ b/cdc/entry/schema/snapshot.go @@ -20,6 +20,7 @@ import ( "sync" "time" + "github.com/goccy/go-json" "github.com/google/btree" "github.com/pingcap/errors" "github.com/pingcap/log" @@ -31,6 +32,10 @@ import ( "go.uber.org/zap" ) +const ( + mTablePrefix = "Table" +) + // Snapshot stores the source TiDB all schema information. // If no special comments, all public methods are thread-safe. type Snapshot struct { @@ -122,8 +127,8 @@ func GetSchemaVersion(meta *timeta.Meta) (int64, error) { return version, nil } -// NewSingleSnapshotFromMeta creates a new single schema snapshot from a tidb meta -func NewSingleSnapshotFromMeta( +// NewSnapshotFromMeta creates a schema snapshot from meta. +func NewSnapshotFromMeta( id model.ChangeFeedID, meta *timeta.Meta, currentTs uint64, @@ -136,28 +141,19 @@ func NewSingleSnapshotFromMeta( snap.inner.currentTs = currentTs return snap, nil } - return NewSnapshotFromMeta(id, meta, currentTs, forceReplicate, filter) -} -// NewSnapshotFromMeta creates a schema snapshot from meta. -func NewSnapshotFromMeta( - id model.ChangeFeedID, - meta *timeta.Meta, - currentTs uint64, - forceReplicate bool, - filter filter.Filter, -) (*Snapshot, error) { start := time.Now() snap := NewEmptySnapshot(forceReplicate) dbinfos, err := meta.ListDatabases() if err != nil { return nil, cerror.WrapError(cerror.ErrMetaListDatabases, err) } + tableCount := 0 // `tag` is used to reverse sort all versions in the generated snapshot. tag := negative(currentTs) for _, dbinfo := range dbinfos { if filter.ShouldIgnoreSchema(dbinfo.Name.O) { - log.Debug("ignore database", zap.String("db", dbinfo.Name.O)) + log.Debug("ignore database", zap.Stringer("db", dbinfo.Name), zap.Stringer("changefeed", id)) continue } vid := newVersionedID(dbinfo.ID, tag) @@ -167,31 +163,41 @@ func NewSnapshotFromMeta( vname := newVersionedEntityName(-1, dbinfo.Name.O, tag) // -1 means the entity is a schema. vname.target = dbinfo.ID snap.inner.schemaNameToID.ReplaceOrInsert(vname) - // get all tables Name - tableNames, err := meta.ListSimpleTables(dbinfo.ID) + + rawTables, err := meta.GetMetasByDBID(dbinfo.ID) if err != nil { return nil, cerror.WrapError(cerror.ErrMetaListDatabases, err) } - tableNeeded := make([]*timodel.TableNameInfo, 0, len(tableNames)) - // filter tables - for _, table := range tableNames { - if filter.ShouldIgnoreTable(dbinfo.Name.O, table.Name.O) { - log.Debug("ignore table", zap.String("table", table.Name.O)) + tableInfos := make([]*timodel.TableInfo, 0, len(rawTables)/2) + for _, r := range rawTables { + tableKey := string(r.Field) + if !strings.HasPrefix(tableKey, mTablePrefix) { continue } - tableNeeded = append(tableNeeded, table) - } - tableInfos := make([]*timodel.TableInfo, 0, len(tableNeeded)) - for _, table := range tableNeeded { - tableInfo, err := meta.GetTable(dbinfo.ID, table.ID) + + tbName := &timodel.TableNameInfo{} + err := json.Unmarshal(r.Value, tbName) + if err != nil { + return nil, errors.Trace(err) + } + + if filter.ShouldIgnoreTable(dbinfo.Name.O, tbName.Name.O) { + log.Debug("ignore table", zap.String("db", dbinfo.Name.O), + zap.String("table", tbName.Name.O)) + continue + } + + tbInfo := &timodel.TableInfo{} + err = json.Unmarshal(r.Value, tbInfo) if err != nil { return nil, errors.Trace(err) } - tableInfos = append(tableInfos, tableInfo) + tableInfos = append(tableInfos, tbInfo) } for _, tableInfo := range tableInfos { tableInfo := model.WrapTableInfo(dbinfo.ID, dbinfo.Name.O, currentTs, tableInfo) + tableCount++ snap.inner.tables.ReplaceOrInsert(versionedID{ id: tableInfo.ID, tag: tag, @@ -204,8 +210,8 @@ func NewSnapshotFromMeta( target: tableInfo.ID, }) - ineligible := !tableInfo.IsEligible(forceReplicate) - if ineligible { + eligible := tableInfo.IsEligible(forceReplicate) + if !eligible { snap.inner.ineligibleTables.ReplaceOrInsert(versionedID{id: tableInfo.ID, tag: tag}) } if pi := tableInfo.GetPartitionInfo(); pi != nil { @@ -213,16 +219,18 @@ func NewSnapshotFromMeta( vid := newVersionedID(partition.ID, tag) vid.target = tableInfo snap.inner.partitions.ReplaceOrInsert(vid) - if ineligible { + if !eligible { snap.inner.ineligibleTables.ReplaceOrInsert(versionedID{id: partition.ID, tag: tag}) } } } } } + snap.inner.currentTs = currentTs log.Info("schema snapshot created", zap.Stringer("changefeed", id), + zap.Int("tables", tableCount), zap.Uint64("currentTs", currentTs), zap.Any("duration", time.Since(start).Seconds())) return snap, nil diff --git a/cdc/entry/schema_test.go b/cdc/entry/schema_test.go index 480836fca31..b6c6366dbe0 100644 --- a/cdc/entry/schema_test.go +++ b/cdc/entry/schema_test.go @@ -92,42 +92,68 @@ func TestAllPhysicalTables(t *testing.T) { require.Equal(t, tableIDs, expectedTableIDs) } -func TestAllTables(t *testing.T) { +func TestNewSchemaStorage(t *testing.T) { helper := NewSchemaTestHelper(t) defer helper.Close() + cfg := config.GetDefaultReplicaConfig() + cfg.Filter.Rules = []string{"test.t1"} + f, err := filter.NewFilter(cfg, "") + require.Nil(t, err) + + // add table before create schema storage + job1 := helper.DDL2Job("create table test.t1 (id int primary key)") + helper.DDL2Job("create table test.t2 (id int primary key)") ver, err := helper.Storage().CurrentVersion(oracle.GlobalTxnScope) require.Nil(t, err) + schema, err := NewSchemaStorage(helper.Storage(), ver.Ver, + false, dummyChangeFeedID, util.RoleTester, f) + require.Nil(t, err) + require.NotNil(t, schema) + tableInfos, err := schema.AllTables(context.Background(), ver.Ver) + require.Nil(t, err) + require.Len(t, tableInfos, 1) + require.Equal(t, job1.BinlogInfo.TableInfo.Name.O, tableInfos[0].TableName.Table) +} + +func TestAllTables(t *testing.T) { + helper := NewSchemaTestHelper(t) + defer helper.Close() f, err := filter.NewFilter(config.GetDefaultReplicaConfig(), "") require.Nil(t, err) + // add table before create schema storage + job := helper.DDL2Job("create table test.dongment (id int primary key)") + ver, err := helper.Storage().CurrentVersion(oracle.GlobalTxnScope) + require.Nil(t, err) schema, err := NewSchemaStorage(helper.Storage(), ver.Ver, false, dummyChangeFeedID, util.RoleTester, f) require.Nil(t, err) tableInfos, err := schema.AllTables(context.Background(), ver.Ver) require.Nil(t, err) - require.Len(t, tableInfos, 0) + require.Len(t, tableInfos, 1) + require.Equal(t, job.BinlogInfo.TableInfo.Name.O, tableInfos[0].TableName.Table) // add normal table - job := helper.DDL2Job("create table test.t1(id int primary key)") + job = helper.DDL2Job("create table test.t1(id int primary key)") require.Nil(t, schema.HandleDDLJob(job)) tableInfos, err = schema.AllTables(context.Background(), job.BinlogInfo.FinishedTS) require.Nil(t, err) - require.Len(t, tableInfos, 1) - tableName := tableInfos[0].TableName + require.Len(t, tableInfos, 2) + tableName := tableInfos[1].TableName require.Equal(t, model.TableName{ Schema: "test", Table: "t1", - TableID: 104, + TableID: 106, }, tableName) // add ineligible table job = helper.DDL2Job("create table test.t2(id int)") require.Nil(t, schema.HandleDDLJob(job)) tableInfos, err = schema.AllTables(context.Background(), job.BinlogInfo.FinishedTS) require.Nil(t, err) - require.Len(t, tableInfos, 1) - tableName = tableInfos[0].TableName + require.Len(t, tableInfos, 2) + tableName = tableInfos[1].TableName require.Equal(t, model.TableName{ Schema: "test", Table: "t1", - TableID: 104, + TableID: 106, }, tableName) } diff --git a/cdc/entry/validator.go b/cdc/entry/validator.go index e19ceb70772..36fb663532d 100644 --- a/cdc/entry/validator.go +++ b/cdc/entry/validator.go @@ -34,7 +34,7 @@ func VerifyTables( err error, ) { meta := kv.GetSnapshotMeta(storage, startTs) - snap, err := schema.NewSingleSnapshotFromMeta( + snap, err := schema.NewSnapshotFromMeta( model.ChangeFeedID4Test("api", "verifyTable"), meta, startTs, false /* explicitTables */, f) if err != nil { diff --git a/go.mod b/go.mod index 1526dcd2520..304b1eace7d 100644 --- a/go.mod +++ b/go.mod @@ -68,7 +68,7 @@ require ( github.com/pingcap/failpoint v0.0.0-20240528011301-b51a646c7c86 github.com/pingcap/kvproto v0.0.0-20240227073058-929ab83f9754 github.com/pingcap/log v1.1.1-0.20240314023424-862ccc32f18d - github.com/pingcap/tidb v1.1.0-beta.0.20240801132827-888a58b0064b + github.com/pingcap/tidb v1.1.0-beta.0.20240802042051-abb56db84e19 github.com/pingcap/tidb-dashboard v0.0.0-20240326110213-9768844ff5d7 github.com/pingcap/tidb-tools v0.0.0-20240305021104-9f9bea84490b github.com/pingcap/tidb/pkg/parser v0.0.0-20240801132827-888a58b0064b diff --git a/go.sum b/go.sum index 51e1ab5b98a..78f2a1c162f 100644 --- a/go.sum +++ b/go.sum @@ -815,8 +815,8 @@ github.com/pingcap/log v1.1.1-0.20240314023424-862ccc32f18d h1:y3EueKVfVykdpTyfU github.com/pingcap/log v1.1.1-0.20240314023424-862ccc32f18d/go.mod h1:ORfBOFp1eteu2odzsyaxI+b8TzJwgjwyQcGhI+9SfEA= github.com/pingcap/sysutil v1.0.1-0.20240311050922-ae81ee01f3a5 h1:T4pXRhBflzDeAhmOQHNPRRogMYxP13V7BkYw3ZsoSfE= github.com/pingcap/sysutil v1.0.1-0.20240311050922-ae81ee01f3a5/go.mod h1:rlimy0GcTvjiJqvD5mXTRr8O2eNZPBrcUgiWVYp9530= -github.com/pingcap/tidb v1.1.0-beta.0.20240801132827-888a58b0064b h1:SxypIc3So2jYPlrq1GaD8/grclanY25D/nytYqV1BYw= -github.com/pingcap/tidb v1.1.0-beta.0.20240801132827-888a58b0064b/go.mod h1:1b7fS/qABhsFxohbeF5vR1V7BuWqWMlAPXIi41TX3/Q= +github.com/pingcap/tidb v1.1.0-beta.0.20240802042051-abb56db84e19 h1:wy7GKUrK4G7vTcA7vEUmJRk0tkuj81PBnZuRzRIvj20= +github.com/pingcap/tidb v1.1.0-beta.0.20240802042051-abb56db84e19/go.mod h1:1b7fS/qABhsFxohbeF5vR1V7BuWqWMlAPXIi41TX3/Q= github.com/pingcap/tidb-dashboard v0.0.0-20240326110213-9768844ff5d7 h1:eFu98FbfJB7PKWOtkaV6YNXXJWqDhczQX56j/iucgU4= github.com/pingcap/tidb-dashboard v0.0.0-20240326110213-9768844ff5d7/go.mod h1:ucZBRz52icb23T/5Z4CsuUHmarYiin7p2MeiVBe+o8c= github.com/pingcap/tidb-tools v0.0.0-20240305021104-9f9bea84490b h1:UC4lLT2OBHGImFJbiXQfIxIck7AoFLIiRJ6Eo6uFVaw= diff --git a/tests/integration_tests/http_api_tls/run.sh b/tests/integration_tests/http_api_tls/run.sh index 4cb79e8cc71..a88fcfff6fb 100644 --- a/tests/integration_tests/http_api_tls/run.sh +++ b/tests/integration_tests/http_api_tls/run.sh @@ -103,7 +103,7 @@ function run() { # wait for above sql done in the up source sleep 2 - check_table_exists test.simple1 ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} + check_table_exists test.verify_table_eligible ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} sequential_cases=( "list_changefeed"