From 8da612505413b9ccb05e3e9c2696005d292df462 Mon Sep 17 00:00:00 2001 From: Wenqi Mou Date: Mon, 2 Dec 2024 21:30:20 -0700 Subject: [PATCH] This is an automated cherry-pick of #57742 Signed-off-by: ti-chi-bot --- br/pkg/task/stream.go | 34 +++++++++ br/pkg/utils/BUILD.bazel | 4 + br/pkg/utils/encryption.go | 44 +++++++++++ br/pkg/utils/wait.go | 49 ++++++++++++ .../run.sh | 51 +++++++++++++ br/tests/run_group_br_tests.sh | 74 +++++++++++++++++++ domain/domain.go | 31 ++++++++ domain/schema_validator.go | 6 ++ 8 files changed, 293 insertions(+) create mode 100644 br/pkg/utils/encryption.go create mode 100644 br/pkg/utils/wait.go create mode 100644 br/tests/br_pitr_long_running_schema_loading/run.sh create mode 100755 br/tests/run_group_br_tests.sh diff --git a/br/pkg/task/stream.go b/br/pkg/task/stream.go index f5b2a395a71c4..7e89a02c6cc33 100644 --- a/br/pkg/task/stream.go +++ b/br/pkg/task/stream.go @@ -73,6 +73,12 @@ const ( "You may check the metadata and continue by wait other task finish or manually delete the lock file " + truncateLockPath + " at the external storage." ) +const ( + waitInfoSchemaReloadCheckInterval = 1 * time.Second + // a million tables should take a few minutes to load all DDL change, making 15 to make sure we don't exit early + waitInfoSchemaReloadTimeout = 15 * time.Minute +) + var ( StreamStart = "log start" StreamStop = "log stop" @@ -1303,6 +1309,21 @@ func restoreStream( return errors.Annotate(err, "failed to restore kv files") } + // failpoint to stop for a while after restoring kvs + // this is to mimic the scenario that restore takes long time and the lease in schemaInfo has expired and needs refresh + failpoint.Inject("post-restore-kv-pending", func(val failpoint.Value) { + if val.(bool) { + // not ideal to use sleep but not sure what's the better way right now + log.Info("sleep after restoring kv") + time.Sleep(2 * time.Second) + } + }) + + // make sure schema reload finishes before proceeding + if err = waitUntilSchemaReload(ctx, client); err != nil { + return errors.Trace(err) + } + if err = client.CleanUpKVFiles(ctx); err != nil { return errors.Annotate(err, "failed to clean up") } @@ -1674,3 +1695,16 @@ func checkPiTRRequirements(ctx context.Context, g glue.Glue, cfg *RestoreConfig) return nil } + +func waitUntilSchemaReload(ctx context.Context, client *logclient.LogClient) error { + log.Info("waiting for schema info finishes reloading") + reloadStart := time.Now() + conditionFunc := func() bool { + return !client.GetDomain().IsLeaseExpired() + } + if err := utils.WaitUntil(ctx, conditionFunc, waitInfoSchemaReloadCheckInterval, waitInfoSchemaReloadTimeout); err != nil { + return errors.Annotate(err, "failed to wait until schema reload") + } + log.Info("reloading schema finished", zap.Duration("timeTaken", time.Since(reloadStart))) + return nil +} diff --git a/br/pkg/utils/BUILD.bazel b/br/pkg/utils/BUILD.bazel index a54e0b1f95086..a38003564c70f 100644 --- a/br/pkg/utils/BUILD.bazel +++ b/br/pkg/utils/BUILD.bazel @@ -21,7 +21,11 @@ go_library( "safe_point.go", "schema.go", "store_manager.go", +<<<<<<< HEAD "suspend_importing.go", +======= + "wait.go", +>>>>>>> 8fe061897e8 (br: fix insert gc failed due to slow schema reload (#57742)) "worker.go", ], importpath = "github.com/pingcap/tidb/br/pkg/utils", diff --git a/br/pkg/utils/encryption.go b/br/pkg/utils/encryption.go new file mode 100644 index 0000000000000..9262c93a4e69d --- /dev/null +++ b/br/pkg/utils/encryption.go @@ -0,0 +1,44 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package utils + +import ( + "github.com/pingcap/errors" + backuppb "github.com/pingcap/kvproto/pkg/brpb" + "github.com/pingcap/kvproto/pkg/encryptionpb" + berrors "github.com/pingcap/tidb/br/pkg/errors" + "github.com/pingcap/tidb/pkg/util/encrypt" +) + +func Decrypt(content []byte, cipher *backuppb.CipherInfo, iv []byte) ([]byte, error) { + if len(content) == 0 || cipher == nil { + return content, nil + } + + switch cipher.CipherType { + case encryptionpb.EncryptionMethod_PLAINTEXT: + return content, nil + case encryptionpb.EncryptionMethod_AES128_CTR, + encryptionpb.EncryptionMethod_AES192_CTR, + encryptionpb.EncryptionMethod_AES256_CTR: + return encrypt.AESDecryptWithCTR(content, cipher.CipherKey, iv) + default: + return content, errors.Annotatef(berrors.ErrInvalidArgument, "cipher type invalid %s", cipher.CipherType) + } +} + +func IsEffectiveEncryptionMethod(method encryptionpb.EncryptionMethod) bool { + return method != encryptionpb.EncryptionMethod_UNKNOWN && method != encryptionpb.EncryptionMethod_PLAINTEXT +} diff --git a/br/pkg/utils/wait.go b/br/pkg/utils/wait.go new file mode 100644 index 0000000000000..5e5616eafb9eb --- /dev/null +++ b/br/pkg/utils/wait.go @@ -0,0 +1,49 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package utils + +import ( + "context" + "time" + + "github.com/pingcap/errors" +) + +func WaitUntil(ctx context.Context, condition func() bool, checkInterval, maxTimeout time.Duration) error { + // do a quick check before starting the ticker + if condition() { + return nil + } + + timeoutCtx, cancel := context.WithTimeout(ctx, maxTimeout) + defer cancel() + + ticker := time.NewTicker(checkInterval) + defer ticker.Stop() + + for { + select { + case <-timeoutCtx.Done(): + if ctx.Err() != nil { + return ctx.Err() + } + return errors.Errorf("waitUntil timed out after waiting for %v", maxTimeout) + case <-ticker.C: + if condition() { + return nil + } + } + } +} diff --git a/br/tests/br_pitr_long_running_schema_loading/run.sh b/br/tests/br_pitr_long_running_schema_loading/run.sh new file mode 100644 index 0000000000000..e6f32c08bcdce --- /dev/null +++ b/br/tests/br_pitr_long_running_schema_loading/run.sh @@ -0,0 +1,51 @@ +#!/bin/bash +# +# Copyright 2024 PingCAP, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +set -eu +. run_services +CUR=$(cd `dirname $0`; pwd) + +TASK_NAME="pitr_long_running_schema_loading" +res_file="$TEST_DIR/sql_res.$TEST_NAME.txt" +DB="$TEST_NAME" + +restart_services + +run_sql "CREATE SCHEMA $DB;" + +# start the log backup +run_br --pd $PD_ADDR log start --task-name $TASK_NAME -s "local://$TEST_DIR/$TASK_NAME/log" + +run_sql "USE $DB; CREATE TABLE t1 (id INT PRIMARY KEY, value VARCHAR(255));" +run_sql "USE $DB; INSERT INTO t1 VALUES (1, 'before-backup-1'), (2, 'before-backup-2');" + + +# do a full backup +run_br --pd "$PD_ADDR" backup full -s "local://$TEST_DIR/$TASK_NAME/full" + +run_sql "USE $DB; INSERT INTO t1 VALUES (3, 'after-backup-1'), (4, 'after-backup-2');" +run_sql "USE $DB; DROP TABLE t1;" +run_sql "USE $DB; CREATE TABLE t2 (id INT PRIMARY KEY, data TEXT);" +run_sql "USE $DB; INSERT INTO t2 VALUES (1, 'new-table-data');" + +echo "wait checkpoint advance" +. "$CUR/../br_test_utils.sh" && wait_log_checkpoint_advance $TASK_NAME + +restart_services + +export GO_FAILPOINTS="github.com/pingcap/tidb/pkg/domain/mock-load-schema-long-time=return(true);github.com/pingcap/tidb/br/pkg/task/post-restore-kv-pending=return(true)" +run_br --pd "$PD_ADDR" restore point -s "local://$TEST_DIR/$TASK_NAME/log" --full-backup-storage "local://$TEST_DIR/$TASK_NAME/full" +export GO_FAILPOINTS="" diff --git a/br/tests/run_group_br_tests.sh b/br/tests/run_group_br_tests.sh new file mode 100755 index 0000000000000..fdb9dd6d6ea6b --- /dev/null +++ b/br/tests/run_group_br_tests.sh @@ -0,0 +1,74 @@ +#!/usr/bin/env bash + +# This script split the integration tests into 9 groups to support parallel group tests execution. +# all the integration tests are located in br/tests directory. only the directories +# containing run.sh will be considered as valid br integration tests. the script will print the total case number + +set -eo pipefail + +# Step 1 +CUR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +group=$1 +export COV_DIR="/tmp/group_cover" +rm -rf $COV_DIR +mkdir -p $COV_DIR + +# Define groups +# Note: If new group is added, the group name must also be added to CI +# * https://github.com/PingCAP-QE/ci/blob/main/pipelines/pingcap/tidb/latest/pull_br_integration_test.groovy +# Each group of tests consumes as much time as possible, thus reducing CI waiting time. +# Putting multiple light tests together and heavy tests in a separate group. +declare -A groups +groups=( + ["G00"]="br_300_small_tables br_backup_empty br_backup_version br_cache_table br_case_sensitive br_charset_gbk br_check_new_collocation_enable br_history br_gcs br_rawkv br_tidb_placement_policy" + ["G01"]="br_autoid br_crypter2 br_db br_db_online br_db_online_newkv br_db_skip br_debug_meta br_ebs br_foreign_key br_full br_table_partition br_full_ddl br_tiflash" + ["G02"]="br_full_cluster_restore br_full_index br_incremental_ddl br_pitr_failpoint br_pitr_gc_safepoint br_other br_pitr_long_running_schema_loading" + ["G03"]='br_incompatible_tidb_config br_incremental br_incremental_index br_incremental_only_ddl br_incremental_same_table br_insert_after_restore br_key_locked br_log_test br_move_backup br_mv_index' + ["G04"]='br_range br_replica_read br_restore_TDE_enable br_restore_log_task_enable br_s3 br_shuffle_leader br_shuffle_region br_single_table' + ["G05"]='br_skip_checksum br_split_region_fail br_systables br_table_filter br_txn br_stats br_clustered_index br_crypter br_partition_add_index' + ["G06"]='br_tikv_outage br_tikv_outage3 br_restore_checkpoint br_encryption' + ["G07"]='br_pitr' + ["G08"]='br_tikv_outage2 br_ttl br_views_and_sequences br_z_gc_safepoint br_autorandom br_file_corruption br_tiflash_conflict' +) + +# Get other cases not in groups, to avoid missing any case +others=() +for script in "$CUR"/*/run.sh; do + test_name="$(basename "$(dirname "$script")")" + if [[ $test_name != br* ]]; then + continue + fi + # shellcheck disable=SC2076 + if [[ ! " ${groups[*]} " =~ " ${test_name} " ]]; then + others=("${others[@]} ${test_name}") + fi +done + +# enable local encryption for all tests +ENABLE_ENCRYPTION=true +export ENABLE_ENCRYPTION + +if [[ "$group" == "others" ]]; then + if [[ -z $others ]]; then + echo "All br integration test cases have been added to groups" + exit 0 + fi + echo "Error: "$others" is not added to any group in br/tests/run_group_br_tests.sh" + exit 1 +elif [[ " ${!groups[*]} " =~ " ${group} " ]]; then + test_names="${groups[${group}]}" + # Run test cases + if [[ -n $test_names ]]; then + echo "" + echo "Run cases: ${test_names}" + for case_name in $test_names; do + echo "Run cases: ${case_name}" + rm -rf /tmp/backup_restore_test + mkdir -p /tmp/backup_restore_test + TEST_NAME=${case_name} ${CUR}/run.sh + done + fi +else + echo "Error: invalid group name: ${group}" + exit 1 +fi diff --git a/domain/domain.go b/domain/domain.go index fca482e873b28..da0a0afed8cd7 100644 --- a/domain/domain.go +++ b/domain/domain.go @@ -224,6 +224,19 @@ func (do *Domain) loadInfoSchema(startTS uint64) (infoschema.InfoSchema, bool, i logutil.BgLogger().Error("failed to load schema diff", zap.Error(err)) } +<<<<<<< HEAD:domain/domain.go +======= + // add failpoint to simulate long-running schema loading scenario + failpoint.Inject("mock-load-schema-long-time", func(val failpoint.Value) { + if val.(bool) { + // not ideal to use sleep, but not sure if there is a better way + logutil.BgLogger().Error("sleep before doing a full load") + time.Sleep(15 * time.Second) + } + }) + + // full load. +>>>>>>> 8fe061897e8 (br: fix insert gc failed due to slow schema reload (#57742)):pkg/domain/domain.go schemas, err := do.fetchAllSchemasWithTables(m) if err != nil { return nil, false, currentSchemaVersion, nil, err @@ -1111,6 +1124,24 @@ func (do *Domain) Init( return nil } +<<<<<<< HEAD:domain/domain.go +======= +// GetSchemaLease return the schema lease. +func (do *Domain) GetSchemaLease() time.Duration { + return do.schemaLease +} + +// IsLeaseExpired returns whether lease has expired +func (do *Domain) IsLeaseExpired() bool { + return do.SchemaValidator.IsLeaseExpired() +} + +// InitInfo4Test init infosync for distributed execution test. +func (do *Domain) InitInfo4Test() { + infosync.MockGlobalServerInfoManagerEntry.Add(do.ddl.GetID(), do.ServerID) +} + +>>>>>>> 8fe061897e8 (br: fix insert gc failed due to slow schema reload (#57742)):pkg/domain/domain.go // SetOnClose used to set do.onClose func. func (do *Domain) SetOnClose(onClose func()) { do.onClose = onClose diff --git a/domain/schema_validator.go b/domain/schema_validator.go index 495306a6ff936..73fa7be1e0e36 100644 --- a/domain/schema_validator.go +++ b/domain/schema_validator.go @@ -57,6 +57,8 @@ type SchemaValidator interface { Reset() // IsStarted indicates whether SchemaValidator is started. IsStarted() bool + // IsLeaseExpired checks whether the current lease has expired + IsLeaseExpired() bool } type deltaSchemaInfo struct { @@ -170,6 +172,10 @@ func (s *schemaValidator) Update(leaseGrantTS uint64, oldVer, currVer int64, cha } } +func (s *schemaValidator) IsLeaseExpired() bool { + return time.Now().After(s.latestSchemaExpire) +} + // isRelatedTablesChanged returns the result whether relatedTableIDs is changed // from usedVer to the latest schema version. // NOTE, this function should be called under lock!