From 9bf682b9c80785b4fbd9ce6cfcfa21723d69675f Mon Sep 17 00:00:00 2001 From: Wenqi Mou Date: Tue, 7 Jan 2025 20:50:45 -0500 Subject: [PATCH] address comments Signed-off-by: Wenqi Mou --- .../log_client/batch_meta_processor.go | 6 +- br/pkg/stream/table_mapping.go | 4 +- br/pkg/task/BUILD.bazel | 2 +- br/pkg/task/restore.go | 20 ++- br/pkg/task/restore_test.go | 141 +++++++++++++++++- br/tests/br_pitr_table_filter/run.sh | 111 +++++++++++--- 6 files changed, 249 insertions(+), 35 deletions(-) diff --git a/br/pkg/restore/log_client/batch_meta_processor.go b/br/pkg/restore/log_client/batch_meta_processor.go index 79397117cb782..ca48d5aae4fb1 100644 --- a/br/pkg/restore/log_client/batch_meta_processor.go +++ b/br/pkg/restore/log_client/batch_meta_processor.go @@ -192,10 +192,8 @@ func (mp *MetaKVInfoProcessor) ProcessBatch( } else if !meta.IsDBkey(rawKey.Key) { // also see RewriteMetaKvEntry continue - } - - // collect table history indexed by table id, same id may have different table names in history - if meta.IsTableKey(rawKey.Field) { + } else if meta.IsTableKey(rawKey.Field) { + // collect table history indexed by table id, same id may have different table names in history var tableInfo model.TableInfo if err := json.Unmarshal(value, &tableInfo); err != nil { return nil, errors.Trace(err) diff --git a/br/pkg/stream/table_mapping.go b/br/pkg/stream/table_mapping.go index 750d3ffdaee77..c5315a98a290e 100644 --- a/br/pkg/stream/table_mapping.go +++ b/br/pkg/stream/table_mapping.go @@ -118,10 +118,9 @@ func (tm *TableMappingManager) ProcessTableValueAndUpdateIdMapping(dbID int64, t } // update table ID and partition ID. - tableInfo.ID = tableReplace.TableID partitions := tableInfo.GetPartitionInfo() if partitions != nil { - for i, partition := range partitions.Definitions { + for _, partition := range partitions.Definitions { newID, exist := tableReplace.PartitionMap[partition.ID] if !exist { newID, exist = tm.globalIdMap[partition.ID] @@ -131,7 +130,6 @@ func (tm *TableMappingManager) ProcessTableValueAndUpdateIdMapping(dbID int64, t } tableReplace.PartitionMap[partition.ID] = newID } - partitions.Definitions[i].ID = newID } } return nil diff --git a/br/pkg/task/BUILD.bazel b/br/pkg/task/BUILD.bazel index d6d4aaaf0291e..234da81fc0e20 100644 --- a/br/pkg/task/BUILD.bazel +++ b/br/pkg/task/BUILD.bazel @@ -115,7 +115,7 @@ go_test( ], embed = [":task"], flaky = True, - shard_count = 39, + shard_count = 40, deps = [ "//br/pkg/backup", "//br/pkg/config", diff --git a/br/pkg/task/restore.go b/br/pkg/task/restore.go index 4437d076e1b94..78df38655f69d 100644 --- a/br/pkg/task/restore.go +++ b/br/pkg/task/restore.go @@ -862,7 +862,8 @@ func runSnapshotRestore(c context.Context, mgr *conn.Mgr, g glue.Glue, cmdName s if cfg.logTableHistoryManager != nil { // adjust tables to restore in the snapshot restore phase since it will later be renamed during // log restore and will fall into or out of the filter range. - err := adjustTablesToRestoreAndCreateTableTracker(cfg.logTableHistoryManager, cfg.RestoreConfig, client, fileMap, tableMap) + err := AdjustTablesToRestoreAndCreateTableTracker(cfg.logTableHistoryManager, cfg.RestoreConfig, + client.GetDatabaseMap(), fileMap, tableMap) if err != nil { return errors.Trace(err) } @@ -1452,15 +1453,13 @@ func filterRestoreFiles( return } -func adjustTablesToRestoreAndCreateTableTracker( +func AdjustTablesToRestoreAndCreateTableTracker( logBackupTableHistory *stream.LogBackupTableHistoryManager, cfg *RestoreConfig, - client *snapclient.SnapClient, + snapshotDBMap map[int64]*metautil.Database, fileMap map[string]*backuppb.File, tableMap map[int64]*metautil.Table, ) (err error) { - snapshotDBMap := client.GetDatabaseMap() - // build tracker for pitr restore to use later piTRTableTracker := utils.NewPiTRTableTracker() @@ -1476,6 +1475,11 @@ func adjustTablesToRestoreAndCreateTableTracker( tableHistory := logBackupTableHistory.GetTableHistory() for tableID, dbIDAndTableName := range tableHistory { + if _, exists := tableMap[tableID]; exists { + // going to restore anyway, skip + continue + } + start := dbIDAndTableName[0] end := dbIDAndTableName[1] @@ -1504,15 +1508,15 @@ func adjustTablesToRestoreAndCreateTableTracker( // we need to restore original table if utils.MatchTable(cfg.TableFilter, dbName, end.TableName) { // put this db/table id into pitr tracker as it matches with user's filter - // have to update filter here since table might be empty or not in snapshot so nothing will be returned . + // have to update tracker here since table might be empty or not in snapshot so nothing will be returned . // but we still need to capture this table id to restore during log restore. piTRTableTracker.AddTable(end.DbID, tableID) // check if snapshot contains the original db/table originalDB, exists := snapshotDBMap[start.DbID] if !exists { - // original db created during log backup, snapshot doesn't have information about this db so doesn't - // need to restore at snapshot + // original db created during log backup, or full backup has a filter that filters out this db, + // either way snapshot doesn't have information about this db so doesn't need to restore at snapshot continue } diff --git a/br/pkg/task/restore_test.go b/br/pkg/task/restore_test.go index 27200e6bb486f..8c1cb581d8449 100644 --- a/br/pkg/task/restore_test.go +++ b/br/pkg/task/restore_test.go @@ -21,14 +21,16 @@ import ( "github.com/pingcap/tidb/br/pkg/mock" "github.com/pingcap/tidb/br/pkg/restore/split" "github.com/pingcap/tidb/br/pkg/restore/tiflashrec" + "github.com/pingcap/tidb/br/pkg/stream" "github.com/pingcap/tidb/br/pkg/task" - utiltest "github.com/pingcap/tidb/br/pkg/utiltest" + "github.com/pingcap/tidb/br/pkg/utiltest" "github.com/pingcap/tidb/pkg/ddl" "github.com/pingcap/tidb/pkg/meta/model" "github.com/pingcap/tidb/pkg/parser/ast" "github.com/pingcap/tidb/pkg/parser/mysql" "github.com/pingcap/tidb/pkg/testkit" "github.com/pingcap/tidb/pkg/types" + filter "github.com/pingcap/tidb/pkg/util/table-filter" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/tikv/client-go/v2/oracle" @@ -523,3 +525,140 @@ func TestCheckTikvSpace(t *testing.T) { store := pdhttp.StoreInfo{Store: pdhttp.MetaStore{ID: 1}, Status: pdhttp.StoreStatus{Available: "500PB"}} require.NoError(t, task.CheckStoreSpace(400*pb, &store)) } + +func TestAdjustTablesToRestoreAndCreateTableTracker(t *testing.T) { + // test setup + // create test database and table maps + dbInfo1 := model.DBInfo{ + ID: 1, + Name: ast.NewCIStr("test_db_1"), + } + dbInfo2 := model.DBInfo{ + ID: 2, + Name: ast.NewCIStr("test_db_2"), + } + snapshotDBMap := map[int64]*metautil.Database{ + 1: { + Info: &dbInfo1, + }, + 2: { + Info: &dbInfo2, + }, + } + fileMap := map[string]*backuppb.File{ + "test_file": { + Name: "test_file", + }, + } + tableMap := map[int64]*metautil.Table{ + 11: { + DB: &dbInfo1, + Info: &model.TableInfo{ + ID: 11, + Name: ast.NewCIStr("test_table_11"), + }, + }, + 12: { + DB: &dbInfo1, + Info: &model.TableInfo{ + ID: 12, + Name: ast.NewCIStr("test_table_12"), + }, + }, + 21: { + DB: &dbInfo2, + Info: &model.TableInfo{ + ID: 21, + Name: ast.NewCIStr("test_table_21"), + }, + }, + } + + // Test case 1: Basic table tracking + logBackupTableHistory := stream.NewTableHistoryManager() + logBackupTableHistory.AddTableHistory(11, "test_table_11", 1) + logBackupTableHistory.AddTableHistory(12, "test_table_12", 1) + logBackupTableHistory.AddTableHistory(21, "test_table_21", 2) + testFilter, err := filter.Parse([]string{"test_db*.*"}) + require.NoError(t, err) + cfg := &task.RestoreConfig{ + Config: task.Config{ + TableFilter: testFilter, + }, + } + err = task.AdjustTablesToRestoreAndCreateTableTracker(logBackupTableHistory, cfg, snapshotDBMap, fileMap, tableMap) + require.NoError(t, err) + require.True(t, cfg.PiTRTableTracker.ContainsTable(1, 11)) + require.True(t, cfg.PiTRTableTracker.ContainsTable(1, 12)) + require.True(t, cfg.PiTRTableTracker.ContainsTable(2, 21)) + + // Test case 2: Table not in filter + tableFilter, err := filter.Parse([]string{"other_db.other_table"}) + cfg.TableFilter = tableFilter + logBackupTableHistory = stream.NewTableHistoryManager() + logBackupTableHistory.AddTableHistory(11, "test_table_11", 1) + logBackupTableHistory.AddTableHistory(12, "test_table_12", 1) + logBackupTableHistory.AddTableHistory(21, "test_table_21", 2) + err = task.AdjustTablesToRestoreAndCreateTableTracker(logBackupTableHistory, cfg, snapshotDBMap, fileMap, tableMap) + require.NoError(t, err) + require.False(t, cfg.PiTRTableTracker.ContainsTable(1, 11)) + require.False(t, cfg.PiTRTableTracker.ContainsTable(1, 12)) + require.False(t, cfg.PiTRTableTracker.ContainsTable(2, 21)) + + // Test case 3: New table created during log backup + logBackupTableHistory = stream.NewTableHistoryManager() + testFilter, err = filter.Parse([]string{"test_db*.*"}) + cfg.TableFilter = testFilter + logBackupTableHistory.AddTableHistory(11, "test_table_11", 1) + logBackupTableHistory.AddTableHistory(12, "test_table_12", 1) + logBackupTableHistory.AddTableHistory(21, "test_table_21", 2) + logBackupTableHistory.AddTableHistory(13, "new_table", 1) + err = task.AdjustTablesToRestoreAndCreateTableTracker(logBackupTableHistory, cfg, snapshotDBMap, fileMap, tableMap) + require.NoError(t, err) + require.True(t, cfg.PiTRTableTracker.ContainsTable(1, 11)) + require.True(t, cfg.PiTRTableTracker.ContainsTable(1, 12)) + require.True(t, cfg.PiTRTableTracker.ContainsTable(1, 13)) + require.True(t, cfg.PiTRTableTracker.ContainsTable(2, 21)) + + // Test case 4: Table renamed into filter during log backup + logBackupTableHistory = stream.NewTableHistoryManager() + logBackupTableHistory.AddTableHistory(11, "test_table_11", 1) // drop + logBackupTableHistory.AddTableHistory(11, "renamed_table", 2) // create + logBackupTableHistory.AddTableHistory(12, "test_table_12", 1) + logBackupTableHistory.AddTableHistory(21, "test_table_21", 2) + tableFilter, err = filter.Parse([]string{"test_db_2.*"}) + cfg.TableFilter = tableFilter + err = task.AdjustTablesToRestoreAndCreateTableTracker(logBackupTableHistory, cfg, snapshotDBMap, fileMap, tableMap) + require.NoError(t, err) + require.False(t, cfg.PiTRTableTracker.ContainsTable(1, 11)) + require.False(t, cfg.PiTRTableTracker.ContainsTable(1, 12)) + require.True(t, cfg.PiTRTableTracker.ContainsTable(2, 11)) + require.True(t, cfg.PiTRTableTracker.ContainsTable(2, 21)) + + // Test case 5: Table renamed out of filter during log backup + tableFilter, err = filter.Parse([]string{"test_db_1.*"}) + cfg.TableFilter = tableFilter + err = task.AdjustTablesToRestoreAndCreateTableTracker(logBackupTableHistory, cfg, snapshotDBMap, fileMap, tableMap) + require.NoError(t, err) + require.False(t, cfg.PiTRTableTracker.ContainsTable(1, 11)) + require.True(t, cfg.PiTRTableTracker.ContainsTable(1, 12)) + require.False(t, cfg.PiTRTableTracker.ContainsTable(2, 11)) + require.False(t, cfg.PiTRTableTracker.ContainsTable(2, 21)) + + // Test case 6: Log backup table not in snapshot (due to full backup with a different filter) + snapshotDBMap = map[int64]*metautil.Database{} + tableMap = map[int64]*metautil.Table{} + logBackupTableHistory.AddTableHistory(11, "test_table", 1) + err = task.AdjustTablesToRestoreAndCreateTableTracker(logBackupTableHistory, cfg, snapshotDBMap, fileMap, tableMap) + require.NoError(t, err) + require.False(t, cfg.PiTRTableTracker.ContainsTable(1, 11)) + + // Test case 7: DB created during log backup + snapshotDBMap = map[int64]*metautil.Database{} + tableMap = map[int64]*metautil.Table{} + logBackupTableHistory.RecordDBIdToName(1, "test_db_1") + logBackupTableHistory.AddTableHistory(11, "test_table", 1) + err = task.AdjustTablesToRestoreAndCreateTableTracker(logBackupTableHistory, cfg, snapshotDBMap, fileMap, tableMap) + require.NoError(t, err) + require.True(t, cfg.PiTRTableTracker.ContainsTable(1, 11)) +} diff --git a/br/tests/br_pitr_table_filter/run.sh b/br/tests/br_pitr_table_filter/run.sh index 999bf97e9d0a0..b8c89d45d1d3b 100755 --- a/br/tests/br_pitr_table_filter/run.sh +++ b/br/tests/br_pitr_table_filter/run.sh @@ -186,18 +186,75 @@ test_basic_filter() { echo "basic filter test cases passed" } +test_with_full_backup_filter() { + restart_services || { echo "Failed to restart services"; exit 1; } + + echo "start with full backup filter testing" + run_br --pd $PD_ADDR log start --task-name $TASK_NAME -s "local://$TEST_DIR/$TASK_NAME/log" + + run_sql "create schema $DB;" + run_sql "create schema ${DB}_other;" + + echo "write initial data and do snapshot backup" + create_tables_with_values "full_backup" 3 + + run_br backup full -f "${DB}_other.*" -s "local://$TEST_DIR/$TASK_NAME/full" --pd $PD_ADDR + + echo "write more data and wait for log backup to catch up" + run_sql "create table ${DB}_other.test_table(c int); insert into ${DB}_other.test_table values (42);" + create_tables_with_values "log_backup" 3 + + . "$CUR/../br_test_utils.sh" && wait_log_checkpoint_advance "$TASK_NAME" + + # restart services to clean up the cluster + restart_services || { echo "Failed to restart services"; exit 1; } + + echo "case 1 sanity check, zero filter" + run_br --pd "$PD_ADDR" restore point -s "local://$TEST_DIR/$TASK_NAME/log" --full-backup-storage "local://$TEST_DIR/$TASK_NAME/full" + + verify_tables "log_backup" 3 false + verify_tables "full_backup" 3 false + verify_other_db_tables true + + echo "case 2 with log backup table same filter" + run_sql "drop schema ${DB}_other;" + run_br --pd "$PD_ADDR" restore point -s "local://$TEST_DIR/$TASK_NAME/log" --full-backup-storage "local://$TEST_DIR/$TASK_NAME/full" -f "${DB}_other.*" + + verify_tables "log_backup" 3 false + verify_tables "full_backup" 3 false + verify_other_db_tables true + + echo "case 3 with log backup filter include nothing" + run_sql "drop schema ${DB}_other;" + run_br --pd "$PD_ADDR" restore point -s "local://$TEST_DIR/$TASK_NAME/log" --full-backup-storage "local://$TEST_DIR/$TASK_NAME/full" -f "${DB}_nothing.*" + + verify_tables "log_backup" 3 false + verify_tables "full_backup" 3 false + verify_other_db_tables false + + # cleanup + rm -rf "$TEST_DIR/$TASK_NAME" + + echo "with full backup filter test cases passed" +} + test_table_rename() { restart_services || { echo "Failed to restart services"; exit 1; } echo "start table rename with filter testing" run_br --pd $PD_ADDR log start --task-name $TASK_NAME -s "local://$TEST_DIR/$TASK_NAME/log" + # create multiple schemas for cross-db rename testing run_sql "create schema $DB;" + run_sql "create schema ${DB}_other1;" + run_sql "create schema ${DB}_other2;" echo "write initial data and do snapshot backup" create_tables_with_values "full_backup" 3 create_tables_with_values "renamed_in" 3 create_tables_with_values "log_renamed_out" 3 + # add table for multiple rename test + run_sql "create table ${DB}_other1.multi_rename(c int); insert into ${DB}_other1.multi_rename values (42);" run_br backup full -f "$DB.*" -s "local://$TEST_DIR/$TASK_NAME/full" --pd $PD_ADDR @@ -205,6 +262,10 @@ test_table_rename() { create_tables_with_values "log_backup" 3 rename_tables "renamed_in" "log_backup_renamed_in" 3 rename_tables "log_renamed_out" "renamed_out" 3 + + # multiple renames across different databases + run_sql "rename table ${DB}_other1.multi_rename to ${DB}_other2.multi_rename;" + run_sql "rename table ${DB}_other2.multi_rename to $DB.log_multi_rename;" . "$CUR/../br_test_utils.sh" && wait_log_checkpoint_advance "$TASK_NAME" @@ -223,6 +284,22 @@ test_table_rename() { verify_tables "renamed_out" 3 false verify_tables "log_renamed_out" 3 false + # verify multi-renamed table + run_sql "select count(*) = 1 from $DB.log_multi_rename where c = 42" || { + echo "Table multi_rename doesn't have expected value after multiple renames" + exit 1 + } + + # Verify table doesn't exist in intermediate databases + if run_sql "select * from ${DB}_other1.multi_rename" 2>/dev/null; then + echo "Table exists in ${DB}_other1 but should not" + exit 1 + fi + if run_sql "select * from ${DB}_other2.multi_rename" 2>/dev/null; then + echo "Table exists in ${DB}_other2 but should not" + exit 1 + fi + # cleanup rm -rf "$TEST_DIR/$TASK_NAME" @@ -377,11 +454,11 @@ test_system_tables() { run_sql "create schema $DB;" echo "write initial data and do snapshot backup" - # Create and populate a user table for reference + # create and populate a user table for reference run_sql "create table $DB.user_table(id int primary key);" run_sql "insert into $DB.user_table values (1);" - # Make some changes to system tables + # make some changes to system tables run_sql "create user 'test_user'@'%' identified by 'password';" run_sql "grant select on $DB.* to 'test_user'@'%';" @@ -394,36 +471,34 @@ test_system_tables() { . "$CUR/../br_test_utils.sh" && wait_log_checkpoint_advance "$TASK_NAME" - # restart services to clean up the cluster restart_services || { echo "Failed to restart services"; exit 1; } - echo "restore point-in-time backup including system tables" - run_br --pd "$PD_ADDR" restore point -f "*.*" -s "local://$TEST_DIR/$TASK_NAME/log" --full-backup-storage "local://$TEST_DIR/$TASK_NAME/full" - - # Verify system table changes were restored - # Check if user exists with correct privileges - run_sql "select count(*) = 1 from mysql.user where User = 'test_user' and Host = '%'" || { - echo "test_user not found in mysql.user table" + echo "PiTR should error out when system tables are included with explicit filter" + restore_fail=0 + run_br --pd "$PD_ADDR" restore point -f "*.*" -s "local://$TEST_DIR/$TASK_NAME/log" --full-backup-storage "local://$TEST_DIR/$TASK_NAME/full" || restore_fail=1 + if [ $restore_fail -ne 1 ]; then + echo "Expected restore to fail when including system tables with filter" exit 1 - } + fi - # Verify the privileges were restored correctly - run_sql "select count(*) = 1 from mysql.tables_priv where User = 'test_user' and Host = '%' and Table_priv = 'Insert'" || { - echo "Incorrect privileges for test_user" + # Also verify that specific system table filters fail + restore_fail=0 + run_br --pd "$PD_ADDR" restore point -f "mysql.*" -s "local://$TEST_DIR/$TASK_NAME/log" --full-backup-storage "local://$TEST_DIR/$TASK_NAME/full" || restore_fail=1 + if [ $restore_fail -ne 1 ]; then + echo "Expected restore to fail when explicitly filtering system tables" exit 1 - } + fi - # cleanup - run_sql "drop user 'test_user'@'%';" rm -rf "$TEST_DIR/$TASK_NAME" - echo "system tables test passed" } # run all test cases test_basic_filter +test_with_full_backup_filter test_table_rename test_with_checkpoint test_exchange_partition +test_system_tables echo "br pitr table filter all tests passed"