Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

br: fix insert gc failed due to slow schema reload (#57742) #58215

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions br/pkg/task/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,12 @@ const (
"You may check the metadata and continue by wait other task finish or manually delete the lock file " + truncateLockPath + " at the external storage."
)

const (
waitInfoSchemaReloadCheckInterval = 1 * time.Second
// a million tables should take a few minutes to load all DDL change, making 15 to make sure we don't exit early
waitInfoSchemaReloadTimeout = 15 * time.Minute
)

var (
StreamStart = "log start"
StreamStop = "log stop"
Expand Down Expand Up @@ -1445,6 +1451,21 @@ func restoreStream(
return errors.Annotate(err, "failed to restore kv files")
}

// failpoint to stop for a while after restoring kvs
// this is to mimic the scenario that restore takes long time and the lease in schemaInfo has expired and needs refresh
failpoint.Inject("post-restore-kv-pending", func(val failpoint.Value) {
if val.(bool) {
// not ideal to use sleep but not sure what's the better way right now
log.Info("sleep after restoring kv")
time.Sleep(2 * time.Second)
}
})

// make sure schema reload finishes before proceeding
if err = waitUntilSchemaReload(ctx, client); err != nil {
return errors.Trace(err)
}

if err = client.CleanUpKVFiles(ctx); err != nil {
return errors.Annotate(err, "failed to clean up")
}
Expand Down Expand Up @@ -1869,3 +1890,16 @@ func checkPiTRTaskInfo(

return curTaskInfo, doFullRestore, nil
}

func waitUntilSchemaReload(ctx context.Context, client *logclient.LogClient) error {
log.Info("waiting for schema info finishes reloading")
reloadStart := time.Now()
conditionFunc := func() bool {
return !client.GetDomain().IsLeaseExpired()
}
if err := utils.WaitUntil(ctx, conditionFunc, waitInfoSchemaReloadCheckInterval, waitInfoSchemaReloadTimeout); err != nil {
return errors.Annotate(err, "failed to wait until schema reload")
}
log.Info("reloading schema finished", zap.Duration("timeTaken", time.Since(reloadStart)))
return nil
}
1 change: 1 addition & 0 deletions br/pkg/utils/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ go_library(
"safe_point.go",
"schema.go",
"store_manager.go",
"wait.go",
"worker.go",
],
importpath = "github.com/pingcap/tidb/br/pkg/utils",
Expand Down
44 changes: 44 additions & 0 deletions br/pkg/utils/encryption.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package utils

import (
"github.com/pingcap/errors"
backuppb "github.com/pingcap/kvproto/pkg/brpb"
"github.com/pingcap/kvproto/pkg/encryptionpb"
berrors "github.com/pingcap/tidb/br/pkg/errors"
"github.com/pingcap/tidb/pkg/util/encrypt"
)

func Decrypt(content []byte, cipher *backuppb.CipherInfo, iv []byte) ([]byte, error) {
if len(content) == 0 || cipher == nil {
return content, nil
}

switch cipher.CipherType {
case encryptionpb.EncryptionMethod_PLAINTEXT:
return content, nil
case encryptionpb.EncryptionMethod_AES128_CTR,
encryptionpb.EncryptionMethod_AES192_CTR,
encryptionpb.EncryptionMethod_AES256_CTR:
return encrypt.AESDecryptWithCTR(content, cipher.CipherKey, iv)
default:
return content, errors.Annotatef(berrors.ErrInvalidArgument, "cipher type invalid %s", cipher.CipherType)
}
}

func IsEffectiveEncryptionMethod(method encryptionpb.EncryptionMethod) bool {
return method != encryptionpb.EncryptionMethod_UNKNOWN && method != encryptionpb.EncryptionMethod_PLAINTEXT
}
49 changes: 49 additions & 0 deletions br/pkg/utils/wait.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package utils

import (
"context"
"time"

"github.com/pingcap/errors"
)

func WaitUntil(ctx context.Context, condition func() bool, checkInterval, maxTimeout time.Duration) error {
// do a quick check before starting the ticker
if condition() {
return nil
}

timeoutCtx, cancel := context.WithTimeout(ctx, maxTimeout)
defer cancel()

ticker := time.NewTicker(checkInterval)
defer ticker.Stop()

for {
select {
case <-timeoutCtx.Done():
if ctx.Err() != nil {
return ctx.Err()
}
return errors.Errorf("waitUntil timed out after waiting for %v", maxTimeout)
case <-ticker.C:
if condition() {
return nil
}
}
}
}
51 changes: 51 additions & 0 deletions br/tests/br_pitr_long_running_schema_loading/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
#!/bin/bash
#
# Copyright 2024 PingCAP, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

set -eu
. run_services
CUR=$(cd `dirname $0`; pwd)

TASK_NAME="pitr_long_running_schema_loading"
res_file="$TEST_DIR/sql_res.$TEST_NAME.txt"
DB="$TEST_NAME"

restart_services

run_sql "CREATE SCHEMA $DB;"

# start the log backup
run_br --pd $PD_ADDR log start --task-name $TASK_NAME -s "local://$TEST_DIR/$TASK_NAME/log"

run_sql "USE $DB; CREATE TABLE t1 (id INT PRIMARY KEY, value VARCHAR(255));"
run_sql "USE $DB; INSERT INTO t1 VALUES (1, 'before-backup-1'), (2, 'before-backup-2');"


# do a full backup
run_br --pd "$PD_ADDR" backup full -s "local://$TEST_DIR/$TASK_NAME/full"

run_sql "USE $DB; INSERT INTO t1 VALUES (3, 'after-backup-1'), (4, 'after-backup-2');"
run_sql "USE $DB; DROP TABLE t1;"
run_sql "USE $DB; CREATE TABLE t2 (id INT PRIMARY KEY, data TEXT);"
run_sql "USE $DB; INSERT INTO t2 VALUES (1, 'new-table-data');"

echo "wait checkpoint advance"
. "$CUR/../br_test_utils.sh" && wait_log_checkpoint_advance $TASK_NAME

restart_services

export GO_FAILPOINTS="github.com/pingcap/tidb/pkg/domain/mock-load-schema-long-time=return(true);github.com/pingcap/tidb/br/pkg/task/post-restore-kv-pending=return(true)"
run_br --pd "$PD_ADDR" restore point -s "local://$TEST_DIR/$TASK_NAME/log" --full-backup-storage "local://$TEST_DIR/$TASK_NAME/full"
export GO_FAILPOINTS=""
7 changes: 7 additions & 0 deletions br/tests/run_group_br_tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,17 @@ mkdir -p $COV_DIR
# Putting multiple light tests together and heavy tests in a separate group.
declare -A groups
groups=(
<<<<<<< HEAD
["G00"]="br_300_small_tables br_backup_empty br_backup_version br_cache_table br_case_sensitive br_charset_gbk br_check_new_collocation_enable br_history br_gcs br_rawkv"
["G01"]="br_autoid br_crypter2 br_db br_db_online br_db_online_newkv br_db_skip br_debug_meta br_ebs br_foreign_key br_full br_table_partition br_full_ddl"
["G02"]="br_full_cluster_restore br_full_index br_incremental_ddl br_pitr_failpoint"
["G03"]='br_incompatible_tidb_config br_incremental br_incremental_index br_incremental_only_ddl br_incremental_same_table br_insert_after_restore br_key_locked br_log_test br_move_backup br_mv_index br_other br_partition_add_index br_tidb_placement_policy br_tiflash'
=======
["G00"]="br_300_small_tables br_backup_empty br_backup_version br_cache_table br_case_sensitive br_charset_gbk br_check_new_collocation_enable br_history br_gcs br_rawkv br_tidb_placement_policy"
["G01"]="br_autoid br_crypter2 br_db br_db_online br_db_online_newkv br_db_skip br_debug_meta br_ebs br_foreign_key br_full br_table_partition br_full_ddl br_tiflash"
["G02"]="br_full_cluster_restore br_full_index br_incremental_ddl br_pitr_failpoint br_pitr_gc_safepoint br_other br_pitr_long_running_schema_loading"
["G03"]='br_incompatible_tidb_config br_incremental br_incremental_index br_incremental_only_ddl br_incremental_same_table br_insert_after_restore br_key_locked br_log_test br_move_backup br_mv_index'
>>>>>>> 8fe061897e8 (br: fix insert gc failed due to slow schema reload (#57742))
["G04"]='br_range br_replica_read br_restore_TDE_enable br_restore_log_task_enable br_s3 br_shuffle_leader br_shuffle_region br_single_table'
["G05"]='br_skip_checksum br_small_batch_size br_split_region_fail br_systables br_table_filter br_txn br_stats br_clustered_index br_crypter'
["G06"]='br_tikv_outage'
Expand Down
23 changes: 23 additions & 0 deletions pkg/domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,16 @@
// We can fall back to full load, don't need to return the error.
logutil.BgLogger().Error("failed to load schema diff", zap.Error(err))
}

// add failpoint to simulate long-running schema loading scenario
failpoint.Inject("mock-load-schema-long-time", func(val failpoint.Value) {
if val.(bool) {
// not ideal to use sleep, but not sure if there is a better way
logutil.BgLogger().Error("sleep before doing a full load")
time.Sleep(15 * time.Second)
}
})

// full load.
schemas, err := do.fetchAllSchemasWithTables(m)
if err != nil {
Expand Down Expand Up @@ -1328,6 +1338,19 @@
return nil
}

<<<<<<< HEAD

Check failure on line 1341 in pkg/domain/domain.go

View workflow job for this annotation

GitHub Actions / integration-test (5.7.35)

syntax error: non-declaration statement outside function body

Check failure on line 1341 in pkg/domain/domain.go

View workflow job for this annotation

GitHub Actions / Compile for macos-latest

syntax error: non-declaration statement outside function body

Check failure on line 1341 in pkg/domain/domain.go

View workflow job for this annotation

GitHub Actions / Compile for FreeBSD job

syntax error: non-declaration statement outside function body

Check failure on line 1341 in pkg/domain/domain.go

View workflow job for this annotation

GitHub Actions / integration-test (8.0.26)

syntax error: non-declaration statement outside function body
=======
// GetSchemaLease return the schema lease.
func (do *Domain) GetSchemaLease() time.Duration {
return do.schemaLease
}

// IsLeaseExpired returns whether lease has expired
func (do *Domain) IsLeaseExpired() bool {
return do.SchemaValidator.IsLeaseExpired()
}

>>>>>>> 8fe061897e8 (br: fix insert gc failed due to slow schema reload (#57742))

Check failure on line 1353 in pkg/domain/domain.go

View workflow job for this annotation

GitHub Actions / integration-test (5.7.35)

syntax error: non-declaration statement outside function body

Check failure on line 1353 in pkg/domain/domain.go

View workflow job for this annotation

GitHub Actions / integration-test (5.7.35)

invalid character U+0023 '#'

Check failure on line 1353 in pkg/domain/domain.go

View workflow job for this annotation

GitHub Actions / Compile for macos-latest

syntax error: non-declaration statement outside function body

Check failure on line 1353 in pkg/domain/domain.go

View workflow job for this annotation

GitHub Actions / Compile for macos-latest

invalid character U+0023 '#'

Check failure on line 1353 in pkg/domain/domain.go

View workflow job for this annotation

GitHub Actions / Compile for FreeBSD job

syntax error: non-declaration statement outside function body

Check failure on line 1353 in pkg/domain/domain.go

View workflow job for this annotation

GitHub Actions / Compile for FreeBSD job

invalid character U+0023 '#'

Check failure on line 1353 in pkg/domain/domain.go

View workflow job for this annotation

GitHub Actions / integration-test (8.0.26)

syntax error: non-declaration statement outside function body

Check failure on line 1353 in pkg/domain/domain.go

View workflow job for this annotation

GitHub Actions / integration-test (8.0.26)

invalid character U+0023 '#'
// InitInfo4Test init infosync for distributed execution test.
func (do *Domain) InitInfo4Test() {
infosync.MockGlobalServerInfoManagerEntry.Add(do.ddl.GetID(), do.ServerID)
Expand Down
6 changes: 6 additions & 0 deletions pkg/domain/schema_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ type SchemaValidator interface {
Reset()
// IsStarted indicates whether SchemaValidator is started.
IsStarted() bool
// IsLeaseExpired checks whether the current lease has expired
IsLeaseExpired() bool
}

type deltaSchemaInfo struct {
Expand Down Expand Up @@ -172,6 +174,10 @@ func (s *schemaValidator) Update(leaseGrantTS uint64, oldVer, currVer int64, cha
}
}

func (s *schemaValidator) IsLeaseExpired() bool {
return time.Now().After(s.latestSchemaExpire)
}

// isRelatedTablesChanged returns the result whether relatedTableIDs is changed
// from usedVer to the latest schema version.
// NOTE, this function should be called under lock!
Expand Down
Loading