Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#57742
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <[email protected]>
  • Loading branch information
Tristan1900 authored and ti-chi-bot committed Dec 12, 2024
1 parent 65fd2ad commit 3019570
Show file tree
Hide file tree
Showing 8 changed files with 215 additions and 0 deletions.
34 changes: 34 additions & 0 deletions br/pkg/task/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,12 @@ const (
"You may check the metadata and continue by wait other task finish or manually delete the lock file " + truncateLockPath + " at the external storage."
)

const (
waitInfoSchemaReloadCheckInterval = 1 * time.Second
// a million tables should take a few minutes to load all DDL change, making 15 to make sure we don't exit early
waitInfoSchemaReloadTimeout = 15 * time.Minute
)

var (
StreamStart = "log start"
StreamStop = "log stop"
Expand Down Expand Up @@ -1445,6 +1451,21 @@ func restoreStream(
return errors.Annotate(err, "failed to restore kv files")
}

// failpoint to stop for a while after restoring kvs
// this is to mimic the scenario that restore takes long time and the lease in schemaInfo has expired and needs refresh
failpoint.Inject("post-restore-kv-pending", func(val failpoint.Value) {
if val.(bool) {
// not ideal to use sleep but not sure what's the better way right now
log.Info("sleep after restoring kv")
time.Sleep(2 * time.Second)
}
})

// make sure schema reload finishes before proceeding
if err = waitUntilSchemaReload(ctx, client); err != nil {
return errors.Trace(err)
}

if err = client.CleanUpKVFiles(ctx); err != nil {
return errors.Annotate(err, "failed to clean up")
}
Expand Down Expand Up @@ -1869,3 +1890,16 @@ func checkPiTRTaskInfo(

return curTaskInfo, doFullRestore, nil
}

func waitUntilSchemaReload(ctx context.Context, client *logclient.LogClient) error {
log.Info("waiting for schema info finishes reloading")
reloadStart := time.Now()
conditionFunc := func() bool {
return !client.GetDomain().IsLeaseExpired()
}
if err := utils.WaitUntil(ctx, conditionFunc, waitInfoSchemaReloadCheckInterval, waitInfoSchemaReloadTimeout); err != nil {
return errors.Annotate(err, "failed to wait until schema reload")
}
log.Info("reloading schema finished", zap.Duration("timeTaken", time.Since(reloadStart)))
return nil
}
1 change: 1 addition & 0 deletions br/pkg/utils/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ go_library(
"safe_point.go",
"schema.go",
"store_manager.go",
"wait.go",
"worker.go",
],
importpath = "github.com/pingcap/tidb/br/pkg/utils",
Expand Down
44 changes: 44 additions & 0 deletions br/pkg/utils/encryption.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package utils

import (
"github.com/pingcap/errors"
backuppb "github.com/pingcap/kvproto/pkg/brpb"
"github.com/pingcap/kvproto/pkg/encryptionpb"
berrors "github.com/pingcap/tidb/br/pkg/errors"
"github.com/pingcap/tidb/pkg/util/encrypt"
)

func Decrypt(content []byte, cipher *backuppb.CipherInfo, iv []byte) ([]byte, error) {
if len(content) == 0 || cipher == nil {
return content, nil
}

switch cipher.CipherType {
case encryptionpb.EncryptionMethod_PLAINTEXT:
return content, nil
case encryptionpb.EncryptionMethod_AES128_CTR,
encryptionpb.EncryptionMethod_AES192_CTR,
encryptionpb.EncryptionMethod_AES256_CTR:
return encrypt.AESDecryptWithCTR(content, cipher.CipherKey, iv)
default:
return content, errors.Annotatef(berrors.ErrInvalidArgument, "cipher type invalid %s", cipher.CipherType)
}
}

func IsEffectiveEncryptionMethod(method encryptionpb.EncryptionMethod) bool {
return method != encryptionpb.EncryptionMethod_UNKNOWN && method != encryptionpb.EncryptionMethod_PLAINTEXT
}
49 changes: 49 additions & 0 deletions br/pkg/utils/wait.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package utils

import (
"context"
"time"

"github.com/pingcap/errors"
)

func WaitUntil(ctx context.Context, condition func() bool, checkInterval, maxTimeout time.Duration) error {
// do a quick check before starting the ticker
if condition() {
return nil
}

timeoutCtx, cancel := context.WithTimeout(ctx, maxTimeout)
defer cancel()

ticker := time.NewTicker(checkInterval)
defer ticker.Stop()

for {
select {
case <-timeoutCtx.Done():
if ctx.Err() != nil {
return ctx.Err()
}
return errors.Errorf("waitUntil timed out after waiting for %v", maxTimeout)
case <-ticker.C:
if condition() {
return nil
}
}
}
}
51 changes: 51 additions & 0 deletions br/tests/br_pitr_long_running_schema_loading/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
#!/bin/bash
#
# Copyright 2024 PingCAP, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

set -eu
. run_services
CUR=$(cd `dirname $0`; pwd)

TASK_NAME="pitr_long_running_schema_loading"
res_file="$TEST_DIR/sql_res.$TEST_NAME.txt"
DB="$TEST_NAME"

restart_services

run_sql "CREATE SCHEMA $DB;"

# start the log backup
run_br --pd $PD_ADDR log start --task-name $TASK_NAME -s "local://$TEST_DIR/$TASK_NAME/log"

run_sql "USE $DB; CREATE TABLE t1 (id INT PRIMARY KEY, value VARCHAR(255));"
run_sql "USE $DB; INSERT INTO t1 VALUES (1, 'before-backup-1'), (2, 'before-backup-2');"


# do a full backup
run_br --pd "$PD_ADDR" backup full -s "local://$TEST_DIR/$TASK_NAME/full"

run_sql "USE $DB; INSERT INTO t1 VALUES (3, 'after-backup-1'), (4, 'after-backup-2');"
run_sql "USE $DB; DROP TABLE t1;"
run_sql "USE $DB; CREATE TABLE t2 (id INT PRIMARY KEY, data TEXT);"
run_sql "USE $DB; INSERT INTO t2 VALUES (1, 'new-table-data');"

echo "wait checkpoint advance"
. "$CUR/../br_test_utils.sh" && wait_log_checkpoint_advance $TASK_NAME

restart_services

export GO_FAILPOINTS="github.com/pingcap/tidb/pkg/domain/mock-load-schema-long-time=return(true);github.com/pingcap/tidb/br/pkg/task/post-restore-kv-pending=return(true)"
run_br --pd "$PD_ADDR" restore point -s "local://$TEST_DIR/$TASK_NAME/log" --full-backup-storage "local://$TEST_DIR/$TASK_NAME/full"
export GO_FAILPOINTS=""
7 changes: 7 additions & 0 deletions br/tests/run_group_br_tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,17 @@ mkdir -p $COV_DIR
# Putting multiple light tests together and heavy tests in a separate group.
declare -A groups
groups=(
<<<<<<< HEAD
["G00"]="br_300_small_tables br_backup_empty br_backup_version br_cache_table br_case_sensitive br_charset_gbk br_check_new_collocation_enable br_history br_gcs br_rawkv"
["G01"]="br_autoid br_crypter2 br_db br_db_online br_db_online_newkv br_db_skip br_debug_meta br_ebs br_foreign_key br_full br_table_partition br_full_ddl"
["G02"]="br_full_cluster_restore br_full_index br_incremental_ddl br_pitr_failpoint"
["G03"]='br_incompatible_tidb_config br_incremental br_incremental_index br_incremental_only_ddl br_incremental_same_table br_insert_after_restore br_key_locked br_log_test br_move_backup br_mv_index br_other br_partition_add_index br_tidb_placement_policy br_tiflash'
=======
["G00"]="br_300_small_tables br_backup_empty br_backup_version br_cache_table br_case_sensitive br_charset_gbk br_check_new_collocation_enable br_history br_gcs br_rawkv br_tidb_placement_policy"
["G01"]="br_autoid br_crypter2 br_db br_db_online br_db_online_newkv br_db_skip br_debug_meta br_ebs br_foreign_key br_full br_table_partition br_full_ddl br_tiflash"
["G02"]="br_full_cluster_restore br_full_index br_incremental_ddl br_pitr_failpoint br_pitr_gc_safepoint br_other br_pitr_long_running_schema_loading"
["G03"]='br_incompatible_tidb_config br_incremental br_incremental_index br_incremental_only_ddl br_incremental_same_table br_insert_after_restore br_key_locked br_log_test br_move_backup br_mv_index'
>>>>>>> 8fe061897e8 (br: fix insert gc failed due to slow schema reload (#57742))
["G04"]='br_range br_replica_read br_restore_TDE_enable br_restore_log_task_enable br_s3 br_shuffle_leader br_shuffle_region br_single_table'
["G05"]='br_skip_checksum br_small_batch_size br_split_region_fail br_systables br_table_filter br_txn br_stats br_clustered_index br_crypter'
["G06"]='br_tikv_outage'
Expand Down
23 changes: 23 additions & 0 deletions pkg/domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,16 @@ func (do *Domain) loadInfoSchema(startTS uint64) (infoschema.InfoSchema, bool, i
// We can fall back to full load, don't need to return the error.
logutil.BgLogger().Error("failed to load schema diff", zap.Error(err))
}

// add failpoint to simulate long-running schema loading scenario
failpoint.Inject("mock-load-schema-long-time", func(val failpoint.Value) {
if val.(bool) {
// not ideal to use sleep, but not sure if there is a better way
logutil.BgLogger().Error("sleep before doing a full load")
time.Sleep(15 * time.Second)
}
})

// full load.
schemas, err := do.fetchAllSchemasWithTables(m)
if err != nil {
Expand Down Expand Up @@ -1328,6 +1338,19 @@ func (do *Domain) Init(
return nil
}

<<<<<<< HEAD
=======
// GetSchemaLease return the schema lease.
func (do *Domain) GetSchemaLease() time.Duration {
return do.schemaLease
}

// IsLeaseExpired returns whether lease has expired
func (do *Domain) IsLeaseExpired() bool {
return do.SchemaValidator.IsLeaseExpired()
}

>>>>>>> 8fe061897e8 (br: fix insert gc failed due to slow schema reload (#57742))
// InitInfo4Test init infosync for distributed execution test.
func (do *Domain) InitInfo4Test() {
infosync.MockGlobalServerInfoManagerEntry.Add(do.ddl.GetID(), do.ServerID)
Expand Down
6 changes: 6 additions & 0 deletions pkg/domain/schema_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ type SchemaValidator interface {
Reset()
// IsStarted indicates whether SchemaValidator is started.
IsStarted() bool
// IsLeaseExpired checks whether the current lease has expired
IsLeaseExpired() bool
}

type deltaSchemaInfo struct {
Expand Down Expand Up @@ -172,6 +174,10 @@ func (s *schemaValidator) Update(leaseGrantTS uint64, oldVer, currVer int64, cha
}
}

func (s *schemaValidator) IsLeaseExpired() bool {
return time.Now().After(s.latestSchemaExpire)
}

// isRelatedTablesChanged returns the result whether relatedTableIDs is changed
// from usedVer to the latest schema version.
// NOTE, this function should be called under lock!
Expand Down

0 comments on commit 3019570

Please sign in to comment.