Skip to content

Commit

Permalink
br: fix insert gc failed due to slow schema reload (pingcap#57742)
Browse files Browse the repository at this point in the history
close pingcap#57743

(cherry picked from commit 8fe0618)
  • Loading branch information
Tristan1900 committed Dec 12, 2024
1 parent 085de3a commit 2e649aa
Show file tree
Hide file tree
Showing 14 changed files with 157 additions and 1 deletion.
34 changes: 34 additions & 0 deletions br/pkg/task/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,12 @@ const (
"You may check the metadata and continue by wait other task finish or manually delete the lock file " + truncateLockPath + " at the external storage."
)

const (
waitInfoSchemaReloadCheckInterval = 1 * time.Second
// a million tables should take a few minutes to load all DDL change, making 15 to make sure we don't exit early
waitInfoSchemaReloadTimeout = 15 * time.Minute
)

var (
StreamStart = "log start"
StreamStop = "log stop"
Expand Down Expand Up @@ -1450,6 +1456,21 @@ func restoreStream(
return errors.Annotate(err, "failed to restore kv files")
}

// failpoint to stop for a while after restoring kvs
// this is to mimic the scenario that restore takes long time and the lease in schemaInfo has expired and needs refresh
failpoint.Inject("post-restore-kv-pending", func(val failpoint.Value) {
if val.(bool) {
// not ideal to use sleep but not sure what's the better way right now
log.Info("sleep after restoring kv")
time.Sleep(2 * time.Second)
}
})

// make sure schema reload finishes before proceeding
if err = waitUntilSchemaReload(ctx, client); err != nil {
return errors.Trace(err)
}

if err = client.CleanUpKVFiles(ctx); err != nil {
return errors.Annotate(err, "failed to clean up")
}
Expand Down Expand Up @@ -1860,3 +1881,16 @@ func checkPiTRTaskInfo(

return curTaskInfo, doFullRestore, nil
}

func waitUntilSchemaReload(ctx context.Context, client *logclient.LogClient) error {
log.Info("waiting for schema info finishes reloading")
reloadStart := time.Now()
conditionFunc := func() bool {
return !client.GetDomain().IsLeaseExpired()
}
if err := utils.WaitUntil(ctx, conditionFunc, waitInfoSchemaReloadCheckInterval, waitInfoSchemaReloadTimeout); err != nil {
return errors.Annotate(err, "failed to wait until schema reload")
}
log.Info("reloading schema finished", zap.Duration("timeTaken", time.Since(reloadStart)))
return nil
}
1 change: 1 addition & 0 deletions br/pkg/utils/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ go_library(
"schema.go",
"sensitive.go",
"store_manager.go",
"wait.go",
"worker.go",
],
importpath = "github.com/pingcap/tidb/br/pkg/utils",
Expand Down
49 changes: 49 additions & 0 deletions br/pkg/utils/wait.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package utils

import (
"context"
"time"

"github.com/pingcap/errors"
)

func WaitUntil(ctx context.Context, condition func() bool, checkInterval, maxTimeout time.Duration) error {
// do a quick check before starting the ticker
if condition() {
return nil
}

timeoutCtx, cancel := context.WithTimeout(ctx, maxTimeout)
defer cancel()

ticker := time.NewTicker(checkInterval)
defer ticker.Stop()

for {
select {
case <-timeoutCtx.Done():
if ctx.Err() != nil {
return ctx.Err()
}
return errors.Errorf("waitUntil timed out after waiting for %v", maxTimeout)
case <-ticker.C:
if condition() {
return nil
}
}
}
}
51 changes: 51 additions & 0 deletions br/tests/br_pitr_long_running_schema_loading/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
#!/bin/bash
#
# Copyright 2024 PingCAP, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

set -eu
. run_services
CUR=$(cd `dirname $0`; pwd)

TASK_NAME="pitr_long_running_schema_loading"
res_file="$TEST_DIR/sql_res.$TEST_NAME.txt"
DB="$TEST_NAME"

restart_services

run_sql "CREATE SCHEMA $DB;"

# start the log backup
run_br --pd $PD_ADDR log start --task-name $TASK_NAME -s "local://$TEST_DIR/$TASK_NAME/log"

run_sql "USE $DB; CREATE TABLE t1 (id INT PRIMARY KEY, value VARCHAR(255));"
run_sql "USE $DB; INSERT INTO t1 VALUES (1, 'before-backup-1'), (2, 'before-backup-2');"


# do a full backup
run_br --pd "$PD_ADDR" backup full -s "local://$TEST_DIR/$TASK_NAME/full"

run_sql "USE $DB; INSERT INTO t1 VALUES (3, 'after-backup-1'), (4, 'after-backup-2');"
run_sql "USE $DB; DROP TABLE t1;"
run_sql "USE $DB; CREATE TABLE t2 (id INT PRIMARY KEY, data TEXT);"
run_sql "USE $DB; INSERT INTO t2 VALUES (1, 'new-table-data');"

echo "wait checkpoint advance"
. "$CUR/../br_test_utils.sh" && wait_log_checkpoint_advance $TASK_NAME

restart_services

export GO_FAILPOINTS="github.com/pingcap/tidb/pkg/domain/mock-load-schema-long-time=return(true);github.com/pingcap/tidb/br/pkg/task/post-restore-kv-pending=return(true)"
run_br --pd "$PD_ADDR" restore point -s "local://$TEST_DIR/$TASK_NAME/log" --full-backup-storage "local://$TEST_DIR/$TASK_NAME/full"
export GO_FAILPOINTS=""
2 changes: 1 addition & 1 deletion br/tests/run_group.sh
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ declare -A groups
groups=(
["G00"]="br_300_small_tables br_backup_empty br_backup_version br_cache_table br_case_sensitive br_charset_gbk br_check_new_collocation_enable"
["G01"]="br_autoid br_crypter2 br_db br_db_online br_db_online_newkv br_db_skip br_debug_meta br_ebs br_foreign_key br_full"
["G02"]="br_full_cluster_restore br_full_ddl br_full_index br_gcs br_history br_pitr_failpoint"
["G02"]="br_full_cluster_restore br_full_ddl br_full_index br_gcs br_history br_pitr_failpoint br_pitr_long_running_schema_loading"
["G03"]='br_incompatible_tidb_config br_incremental br_incremental_ddl br_incremental_index br_pitr'
["G04"]='br_incremental_only_ddl br_incremental_same_table br_insert_after_restore br_key_locked br_log_test br_move_backup br_mv_index br_other br_partition_add_index'
["G05"]='br_range br_rawkv br_replica_read br_restore_TDE_enable br_restore_log_task_enable br_s3 br_shuffle_leader br_shuffle_region br_single_table'
Expand Down
15 changes: 15 additions & 0 deletions pkg/domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,16 @@ func (do *Domain) loadInfoSchema(startTS uint64) (infoschema.InfoSchema, bool, i
logutil.BgLogger().Error("failed to load schema diff", zap.Error(err))
}

// add failpoint to simulate long-running schema loading scenario
failpoint.Inject("mock-load-schema-long-time", func(val failpoint.Value) {
if val.(bool) {
// not ideal to use sleep, but not sure if there is a better way
logutil.BgLogger().Error("sleep before doing a full load")
time.Sleep(15 * time.Second)
}
})

// full load.
schemas, err := do.fetchAllSchemasWithTables(m)
if err != nil {
return nil, false, currentSchemaVersion, nil, err
Expand Down Expand Up @@ -1291,6 +1301,11 @@ func (do *Domain) Init(
return nil
}

// IsLeaseExpired returns whether lease has expired
func (do *Domain) IsLeaseExpired() bool {
return do.SchemaValidator.IsLeaseExpired()
}

// InitInfo4Test init infosync for distributed execution test.
func (do *Domain) InitInfo4Test() {
infosync.MockGlobalServerInfoManagerEntry.Add(do.ddl.GetID(), do.ServerID)
Expand Down
6 changes: 6 additions & 0 deletions pkg/domain/schema_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ type SchemaValidator interface {
Reset()
// IsStarted indicates whether SchemaValidator is started.
IsStarted() bool
// IsLeaseExpired checks whether the current lease has expired
IsLeaseExpired() bool
}

type deltaSchemaInfo struct {
Expand Down Expand Up @@ -172,6 +174,10 @@ func (s *schemaValidator) Update(leaseGrantTS uint64, oldVer, currVer int64, cha
}
}

func (s *schemaValidator) IsLeaseExpired() bool {
return time.Now().After(s.latestSchemaExpire)
}

// isRelatedTablesChanged returns the result whether relatedTableIDs is changed
// from usedVer to the latest schema version.
// NOTE, this function should be called under lock!
Expand Down
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.

0 comments on commit 2e649aa

Please sign in to comment.