Skip to content

Commit

Permalink
resolve conflicts
Browse files Browse the repository at this point in the history
  • Loading branch information
3pointer committed Nov 20, 2024
1 parent 9530fdc commit f345758
Show file tree
Hide file tree
Showing 15 changed files with 195 additions and 94 deletions.
2 changes: 1 addition & 1 deletion br/pkg/checkpoint/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ go_test(
srcs = ["checkpoint_test.go"],
flaky = True,
race = "on",
shard_count = 8,
shard_count = 9,
deps = [
":checkpoint",
"//br/pkg/gluetidb",
Expand Down
1 change: 0 additions & 1 deletion br/pkg/checkpoint/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,6 @@ func (r *CheckpointRunner[K, V]) WaitForFinish(ctx context.Context, flush bool)
// wait the range flusher exit
r.wg.Wait()
// remove the checkpoint lock
r.checkpointStorage.deleteLock(ctx)
r.checkpointStorage.close()
}

Expand Down
94 changes: 75 additions & 19 deletions br/pkg/checkpoint/checkpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func TestCheckpointMetaForRestore(t *testing.T) {
},
},
}
err = checkpoint.SaveCheckpointMetadataForSnapshotRestore(ctx, se, checkpointMetaForSnapshotRestore)
err = checkpoint.SaveCheckpointMetadataForSstRestore(ctx, se, checkpoint.SnapshotRestoreCheckpointDatabaseName, checkpointMetaForSnapshotRestore)
require.NoError(t, err)
checkpointMetaForSnapshotRestore2, err := checkpoint.LoadCheckpointMetadataForSnapshotRestore(ctx, se.GetSessionCtx().GetRestrictedSQLExecutor())
require.NoError(t, err)
Expand Down Expand Up @@ -278,9 +278,9 @@ func TestCheckpointRestoreRunner(t *testing.T) {
se, err := g.CreateSession(s.Mock.Storage)
require.NoError(t, err)

err = checkpoint.SaveCheckpointMetadataForSnapshotRestore(ctx, se, &checkpoint.CheckpointMetadataForSnapshotRestore{})
err = checkpoint.SaveCheckpointMetadataForSstRestore(ctx, se, checkpoint.SnapshotRestoreCheckpointDatabaseName, &checkpoint.CheckpointMetadataForSnapshotRestore{})
require.NoError(t, err)
checkpointRunner, err := checkpoint.StartCheckpointRestoreRunnerForTest(ctx, se, 5*time.Second, 3*time.Second)
checkpointRunner, err := checkpoint.StartCheckpointRestoreRunnerForTest(ctx, se, checkpoint.SnapshotRestoreCheckpointDatabaseName, 5*time.Second, 3*time.Second)
require.NoError(t, err)

data := map[string]struct {
Expand Down Expand Up @@ -310,7 +310,7 @@ func TestCheckpointRestoreRunner(t *testing.T) {
}

for _, d := range data {
err = checkpoint.AppendRangesForRestore(ctx, checkpointRunner, 1, d.RangeKey)
err = checkpoint.AppendRangesForRestore(ctx, checkpointRunner, 1, d.RangeKey, "")
require.NoError(t, err)
}

Expand All @@ -320,7 +320,7 @@ func TestCheckpointRestoreRunner(t *testing.T) {
checkpointRunner.FlushChecksum(ctx, 4, 4, 4, 4)

for _, d := range data2 {
err = checkpoint.AppendRangesForRestore(ctx, checkpointRunner, 2, d.RangeKey)
err = checkpoint.AppendRangesForRestore(ctx, checkpointRunner, 2, d.RangeKey, "")
require.NoError(t, err)
}

Expand All @@ -343,7 +343,7 @@ func TestCheckpointRestoreRunner(t *testing.T) {
respCount += 1
}

_, err = checkpoint.LoadCheckpointDataForSnapshotRestore(ctx, se.GetSessionCtx().GetRestrictedSQLExecutor(), checker)
_, err = checkpoint.LoadCheckpointDataForSstRestore(ctx, se.GetSessionCtx().GetRestrictedSQLExecutor(), checkpoint.SnapshotRestoreCheckpointDatabaseName, checker)
require.NoError(t, err)
require.Equal(t, 4, respCount)

Expand All @@ -355,10 +355,10 @@ func TestCheckpointRestoreRunner(t *testing.T) {
require.Equal(t, checksum[i].Crc64xor, uint64(i))
}

err = checkpoint.RemoveCheckpointDataForSnapshotRestore(ctx, s.Mock.Domain, se)
err = checkpoint.RemoveCheckpointDataForSstRestore(ctx, s.Mock.Domain, se, checkpoint.SnapshotRestoreCheckpointDatabaseName)
require.NoError(t, err)

exists := checkpoint.ExistsSnapshotRestoreCheckpoint(ctx, s.Mock.Domain)
exists := checkpoint.ExistsSstRestoreCheckpoint(ctx, s.Mock.Domain, checkpoint.SnapshotRestoreCheckpointDatabaseName)
require.False(t, exists)
exists = s.Mock.Domain.InfoSchema().SchemaExists(pmodel.NewCIStr(checkpoint.SnapshotRestoreCheckpointDatabaseName))
require.False(t, exists)
Expand All @@ -371,9 +371,9 @@ func TestCheckpointRunnerRetry(t *testing.T) {
se, err := g.CreateSession(s.Mock.Storage)
require.NoError(t, err)

err = checkpoint.SaveCheckpointMetadataForSnapshotRestore(ctx, se, &checkpoint.CheckpointMetadataForSnapshotRestore{})
err = checkpoint.SaveCheckpointMetadataForSstRestore(ctx, se, checkpoint.SnapshotRestoreCheckpointDatabaseName, &checkpoint.CheckpointMetadataForSnapshotRestore{})
require.NoError(t, err)
checkpointRunner, err := checkpoint.StartCheckpointRestoreRunnerForTest(ctx, se, 100*time.Millisecond, 300*time.Millisecond)
checkpointRunner, err := checkpoint.StartCheckpointRestoreRunnerForTest(ctx, se, checkpoint.SnapshotRestoreCheckpointDatabaseName, 100*time.Millisecond, 300*time.Millisecond)
require.NoError(t, err)

err = failpoint.Enable("github.com/pingcap/tidb/br/pkg/checkpoint/failed-after-checkpoint-flushes", "return(true)")
Expand All @@ -382,25 +382,26 @@ func TestCheckpointRunnerRetry(t *testing.T) {
err = failpoint.Disable("github.com/pingcap/tidb/br/pkg/checkpoint/failed-after-checkpoint-flushes")
require.NoError(t, err)
}()
err = checkpoint.AppendRangesForRestore(ctx, checkpointRunner, 1, "123")
err = checkpoint.AppendRangesForRestore(ctx, checkpointRunner, 1, "123", "")
require.NoError(t, err)
err = checkpoint.AppendRangesForRestore(ctx, checkpointRunner, 2, "456")
err = checkpoint.AppendRangesForRestore(ctx, checkpointRunner, 2, "456", "")
require.NoError(t, err)
err = checkpointRunner.FlushChecksum(ctx, 1, 1, 1, 1)
require.NoError(t, err)
err = checkpointRunner.FlushChecksum(ctx, 2, 2, 2, 2)
time.Sleep(time.Second)
err = failpoint.Disable("github.com/pingcap/tidb/br/pkg/checkpoint/failed-after-checkpoint-flushes")
require.NoError(t, err)
err = checkpoint.AppendRangesForRestore(ctx, checkpointRunner, 3, "789")
err = checkpoint.AppendRangesForRestore(ctx, checkpointRunner, 3, "789", "")
require.NoError(t, err)
err = checkpointRunner.FlushChecksum(ctx, 3, 3, 3, 3)
require.NoError(t, err)
checkpointRunner.WaitForFinish(ctx, true)
se, err = g.CreateSession(s.Mock.Storage)
require.NoError(t, err)
recordSet := make(map[string]int)
_, err = checkpoint.LoadCheckpointDataForSnapshotRestore(ctx, se.GetSessionCtx().GetRestrictedSQLExecutor(),
_, err = checkpoint.LoadCheckpointDataForSstRestore(ctx, se.GetSessionCtx().GetRestrictedSQLExecutor(),
checkpoint.SnapshotRestoreCheckpointDatabaseName,
func(tableID int64, rangeKey checkpoint.RestoreValueType) {
recordSet[fmt.Sprintf("%d_%s", tableID, rangeKey)] += 1
})
Expand All @@ -422,14 +423,14 @@ func TestCheckpointRunnerNoRetry(t *testing.T) {
se, err := g.CreateSession(s.Mock.Storage)
require.NoError(t, err)

err = checkpoint.SaveCheckpointMetadataForSnapshotRestore(ctx, se, &checkpoint.CheckpointMetadataForSnapshotRestore{})
err = checkpoint.SaveCheckpointMetadataForSstRestore(ctx, se, checkpoint.SnapshotRestoreCheckpointDatabaseName, &checkpoint.CheckpointMetadataForSnapshotRestore{})
require.NoError(t, err)
checkpointRunner, err := checkpoint.StartCheckpointRestoreRunnerForTest(ctx, se, 100*time.Millisecond, 300*time.Millisecond)
checkpointRunner, err := checkpoint.StartCheckpointRestoreRunnerForTest(ctx, se, checkpoint.SnapshotRestoreCheckpointDatabaseName, 100*time.Millisecond, 300*time.Millisecond)
require.NoError(t, err)

err = checkpoint.AppendRangesForRestore(ctx, checkpointRunner, 1, "123")
err = checkpoint.AppendRangesForRestore(ctx, checkpointRunner, 1, "123", "")
require.NoError(t, err)
err = checkpoint.AppendRangesForRestore(ctx, checkpointRunner, 2, "456")
err = checkpoint.AppendRangesForRestore(ctx, checkpointRunner, 2, "456", "")
require.NoError(t, err)
err = checkpointRunner.FlushChecksum(ctx, 1, 1, 1, 1)
require.NoError(t, err)
Expand All @@ -440,7 +441,8 @@ func TestCheckpointRunnerNoRetry(t *testing.T) {
se, err = g.CreateSession(s.Mock.Storage)
require.NoError(t, err)
recordSet := make(map[string]int)
_, err = checkpoint.LoadCheckpointDataForSnapshotRestore(ctx, se.GetSessionCtx().GetRestrictedSQLExecutor(),
_, err = checkpoint.LoadCheckpointDataForSstRestore(ctx, se.GetSessionCtx().GetRestrictedSQLExecutor(),
checkpoint.SnapshotRestoreCheckpointDatabaseName,
func(tableID int64, rangeKey checkpoint.RestoreValueType) {
recordSet[fmt.Sprintf("%d_%s", tableID, rangeKey)] += 1
})
Expand Down Expand Up @@ -584,3 +586,57 @@ func TestCheckpointRunnerLock(t *testing.T) {

runner.WaitForFinish(ctx, true)
}

func TestCheckpointCompactedRestoreRunner(t *testing.T) {
ctx := context.Background()
s := utiltest.CreateRestoreSchemaSuite(t)
g := gluetidb.New()
se, err := g.CreateSession(s.Mock.Storage)
require.NoError(t, err)

err = checkpoint.SaveCheckpointMetadataForSstRestore(ctx, se, checkpoint.CompactedRestoreCheckpointDatabaseName, nil)
require.NoError(t, err)
checkpointRunner, err := checkpoint.StartCheckpointRestoreRunnerForTest(ctx, se, checkpoint.CompactedRestoreCheckpointDatabaseName, 500*time.Millisecond, time.Second)
require.NoError(t, err)

data := map[string]struct {
Name string
}{
"a": {Name: "a"},
"A": {Name: "A"},
"1": {Name: "1"},
}

for _, d := range data {
err = checkpoint.AppendRangesForRestore(ctx, checkpointRunner, 1, "", d.Name)
require.NoError(t, err)
}

checkpointRunner.FlushChecksum(ctx, 1, 1, 1, 1)
checkpointRunner.FlushChecksum(ctx, 2, 2, 2, 2)

checkpointRunner.WaitForFinish(ctx, true)

se, err = g.CreateSession(s.Mock.Storage)
require.NoError(t, err)
respCount := 0
checker := func(tableID int64, resp checkpoint.RestoreValueType) {
require.NotNil(t, resp)
d, ok := data[resp.Name]
require.True(t, ok)
require.Equal(t, d.Name, resp.Name)
respCount++
}

_, err = checkpoint.LoadCheckpointDataForSstRestore(ctx, se.GetSessionCtx().GetRestrictedSQLExecutor(), checkpoint.CompactedRestoreCheckpointDatabaseName, checker)
require.NoError(t, err)
require.Equal(t, 3, respCount)

err = checkpoint.RemoveCheckpointDataForSstRestore(ctx, s.Mock.Domain, se, checkpoint.CompactedRestoreCheckpointDatabaseName)
require.NoError(t, err)

exists := checkpoint.ExistsSstRestoreCheckpoint(ctx, s.Mock.Domain, checkpoint.CompactedRestoreCheckpointDatabaseName)
require.False(t, exists)
exists = s.Mock.Domain.InfoSchema().SchemaExists(pmodel.NewCIStr(checkpoint.CompactedRestoreCheckpointDatabaseName))
require.False(t, exists)
}
9 changes: 0 additions & 9 deletions br/pkg/checkpoint/external_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,12 +187,3 @@ func (s *externalCheckpointStorage) updateLock(ctx context.Context) error {

return nil
}

func (s *externalCheckpointStorage) deleteLock(ctx context.Context) {
if s.lockId > 0 {
err := s.storage.DeleteFile(ctx, s.CheckpointLockPath)
if err != nil {
log.Warn("failed to remove the checkpoint lock", zap.Error(err))
}
}
}
2 changes: 1 addition & 1 deletion br/pkg/checkpoint/log_restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ func TryToGetCheckpointTaskInfo(
return nil, errors.Trace(err)
}
}
hasSnapshotMetadata := ExistsSnapshotRestoreCheckpoint(ctx, dom)
hasSnapshotMetadata := ExistsSstRestoreCheckpoint(ctx, dom, SnapshotRestoreCheckpointDatabaseName)

return &CheckpointTaskInfoForLogRestore{
Metadata: metadata,
Expand Down
35 changes: 23 additions & 12 deletions br/pkg/checkpoint/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ type RestoreKeyType = int64
type RestoreValueType struct {
// the file key of a range
RangeKey string
// the file name, used for compacted restore
Name string
}

func (rv RestoreValueType) IdentKey() []byte {
Expand All @@ -45,11 +47,12 @@ func valueMarshalerForRestore(group *RangeGroup[RestoreKeyType, RestoreValueType
func StartCheckpointRestoreRunnerForTest(
ctx context.Context,
se glue.Session,
dbName string,
tick time.Duration,
retryDuration time.Duration,
) (*CheckpointRunner[RestoreKeyType, RestoreValueType], error) {
runner := newCheckpointRunner[RestoreKeyType, RestoreValueType](
newTableCheckpointStorage(se, SnapshotRestoreCheckpointDatabaseName),
newTableCheckpointStorage(se, dbName),
nil, valueMarshalerForRestore)

runner.startCheckpointMainLoop(ctx, tick, tick, 0, retryDuration)
Expand All @@ -60,9 +63,10 @@ func StartCheckpointRestoreRunnerForTest(
func StartCheckpointRunnerForRestore(
ctx context.Context,
se glue.Session,
dbName string,
) (*CheckpointRunner[RestoreKeyType, RestoreValueType], error) {
runner := newCheckpointRunner[RestoreKeyType, RestoreValueType](
newTableCheckpointStorage(se, SnapshotRestoreCheckpointDatabaseName),
newTableCheckpointStorage(se, dbName),
nil, valueMarshalerForRestore)

// for restore, no need to set lock
Expand All @@ -77,23 +81,25 @@ func AppendRangesForRestore(
r *CheckpointRunner[RestoreKeyType, RestoreValueType],
tableID RestoreKeyType,
rangeKey string,
name string,
) error {
return r.Append(ctx, &CheckpointMessage[RestoreKeyType, RestoreValueType]{
GroupKey: tableID,
Group: []RestoreValueType{
{RangeKey: rangeKey},
{RangeKey: rangeKey, Name: name},
},
})
}

// load the whole checkpoint range data and retrieve the metadata of restored ranges
// and return the total time cost in the past executions
func LoadCheckpointDataForSnapshotRestore[K KeyType, V ValueType](
func LoadCheckpointDataForSstRestore[K KeyType, V ValueType](
ctx context.Context,
execCtx sqlexec.RestrictedSQLExecutor,
dbName string,
fn func(K, V),
) (time.Duration, error) {
return selectCheckpointData(ctx, execCtx, SnapshotRestoreCheckpointDatabaseName, fn)
return selectCheckpointData(ctx, execCtx, dbName, fn)
}

func LoadCheckpointChecksumForRestore(
Expand All @@ -118,28 +124,33 @@ func LoadCheckpointMetadataForSnapshotRestore(
return m, err
}

func SaveCheckpointMetadataForSnapshotRestore(
func SaveCheckpointMetadataForSstRestore(
ctx context.Context,
se glue.Session,
dbName string,
meta *CheckpointMetadataForSnapshotRestore,
) error {
err := initCheckpointTable(ctx, se, SnapshotRestoreCheckpointDatabaseName,
err := initCheckpointTable(ctx, se, dbName,
[]string{checkpointDataTableName, checkpointChecksumTableName})
if err != nil {
return errors.Trace(err)
}
return insertCheckpointMeta(ctx, se, SnapshotRestoreCheckpointDatabaseName, checkpointMetaTableName, meta)
if meta != nil {
return insertCheckpointMeta(ctx, se, dbName, checkpointMetaTableName, meta)
}
return nil
}

func ExistsSnapshotRestoreCheckpoint(
func ExistsSstRestoreCheckpoint(
ctx context.Context,
dom *domain.Domain,
dbName string,
) bool {
return dom.InfoSchema().
TableExists(pmodel.NewCIStr(SnapshotRestoreCheckpointDatabaseName), pmodel.NewCIStr(checkpointMetaTableName))
TableExists(pmodel.NewCIStr(dbName), pmodel.NewCIStr(checkpointMetaTableName))
}

func RemoveCheckpointDataForSnapshotRestore(ctx context.Context, dom *domain.Domain, se glue.Session) error {
return dropCheckpointTables(ctx, dom, se, SnapshotRestoreCheckpointDatabaseName,
func RemoveCheckpointDataForSstRestore(ctx context.Context, dom *domain.Domain, se glue.Session, dbName string) error {
return dropCheckpointTables(ctx, dom, se, dbName,
[]string{checkpointDataTableName, checkpointChecksumTableName, checkpointMetaTableName})
}
12 changes: 6 additions & 6 deletions br/pkg/checkpoint/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ type checkpointStorage interface {

initialLock(ctx context.Context) error
updateLock(ctx context.Context) error
deleteLock(ctx context.Context)

close()
}
Expand All @@ -48,8 +47,9 @@ type checkpointStorage interface {
// 2. BR regards the metadata table as a file so that it is not empty if the table exists.
// 3. BR regards the checkpoint table as a directory which is managed by metadata table.
const (
LogRestoreCheckpointDatabaseName string = "__TiDB_BR_Temporary_Log_Restore_Checkpoint"
SnapshotRestoreCheckpointDatabaseName string = "__TiDB_BR_Temporary_Snapshot_Restore_Checkpoint"
LogRestoreCheckpointDatabaseName string = "__TiDB_BR_Temporary_Log_Restore_Checkpoint"
SnapshotRestoreCheckpointDatabaseName string = "__TiDB_BR_Temporary_Snapshot_Restore_Checkpoint"
CompactedRestoreCheckpointDatabaseName string = "__TiDB_BR_Temporary_Compacted_Restore_Checkpoint"

// directory level table
checkpointDataTableName string = "cpt_data"
Expand Down Expand Up @@ -90,7 +90,9 @@ const (

// IsCheckpointDB checks whether the dbname is checkpoint database.
func IsCheckpointDB(dbname pmodel.CIStr) bool {
return dbname.O == LogRestoreCheckpointDatabaseName || dbname.O == SnapshotRestoreCheckpointDatabaseName
return dbname.O == LogRestoreCheckpointDatabaseName ||
dbname.O == SnapshotRestoreCheckpointDatabaseName ||
dbname.O == CompactedRestoreCheckpointDatabaseName
}

const CheckpointIdMapBlockSize int = 524288
Expand Down Expand Up @@ -142,8 +144,6 @@ func (s *tableCheckpointStorage) updateLock(ctx context.Context) error {
return nil
}

func (s *tableCheckpointStorage) deleteLock(ctx context.Context) {}

func (s *tableCheckpointStorage) flushCheckpointData(ctx context.Context, data []byte) error {
sqls, argss := chunkInsertCheckpointSQLs(s.checkpointDBName, checkpointDataTableName, data)
for i, sql := range sqls {
Expand Down
Loading

0 comments on commit f345758

Please sign in to comment.