From 0a9f9c7daa24c3f356aff601dd5187b9dcb97c94 Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Wed, 25 Dec 2024 12:03:06 +0800 Subject: [PATCH] advancer: fix the incorrect gc safepoint behaviours (#52835) (#56699) ref pingcap/tidb#52082 --- br/pkg/streamhelper/advancer.go | 2 +- br/pkg/streamhelper/advancer_env.go | 18 +++++++++++++++++- br/pkg/streamhelper/advancer_test.go | 2 +- br/pkg/streamhelper/basic_lib_for_test.go | 18 +++++++++++------- br/pkg/streamhelper/regioniter.go | 3 +++ br/pkg/streamhelper/regioniter_test.go | 4 ++++ 6 files changed, 37 insertions(+), 10 deletions(-) diff --git a/br/pkg/streamhelper/advancer.go b/br/pkg/streamhelper/advancer.go index c1b776a6b45b8..29f669e44679c 100644 --- a/br/pkg/streamhelper/advancer.go +++ b/br/pkg/streamhelper/advancer.go @@ -453,7 +453,7 @@ func (c *CheckpointAdvancer) onTaskEvent(ctx context.Context, e TaskEvent) error if err := c.env.ClearV3GlobalCheckpointForTask(ctx, e.Name); err != nil { log.Warn("failed to clear global checkpoint", logutil.ShortError(err)) } - if _, err := c.env.BlockGCUntil(ctx, 0); err != nil { + if err := c.env.UnblockGC(ctx); err != nil { log.Warn("failed to remove service GC safepoint", logutil.ShortError(err)) } metrics.LastCheckpoint.DeleteLabelValues(e.Name) diff --git a/br/pkg/streamhelper/advancer_env.go b/br/pkg/streamhelper/advancer_env.go index 8c4c96aed6ac3..bd5de8a4a3051 100644 --- a/br/pkg/streamhelper/advancer_env.go +++ b/br/pkg/streamhelper/advancer_env.go @@ -4,8 +4,10 @@ package streamhelper import ( "context" + "math" "time" + "github.com/pingcap/errors" logbackup "github.com/pingcap/kvproto/pkg/logbackuppb" "github.com/pingcap/tidb/br/pkg/utils" "github.com/pingcap/tidb/config" @@ -46,7 +48,21 @@ type PDRegionScanner struct { // Returns the minimal service GC safe point across all services. // If the arguments is `0`, this would remove the service safe point. func (c PDRegionScanner) BlockGCUntil(ctx context.Context, at uint64) (uint64, error) { - return c.UpdateServiceGCSafePoint(ctx, logBackupServiceID, int64(logBackupSafePointTTL.Seconds()), at) + minimalSafePoint, err := c.UpdateServiceGCSafePoint( + ctx, logBackupServiceID, int64(logBackupSafePointTTL.Seconds()), at) + if err != nil { + return 0, errors.Annotate(err, "failed to block gc until") + } + if minimalSafePoint > at { + return 0, errors.Errorf("minimal safe point %d is greater than the target %d", minimalSafePoint, at) + } + return at, nil +} + +func (c PDRegionScanner) UnblockGC(ctx context.Context) error { + // set ttl to 0, means remove the safe point. + _, err := c.UpdateServiceGCSafePoint(ctx, logBackupServiceID, 0, math.MaxUint64) + return err } // TODO: It should be able to synchoronize the current TS with the PD. diff --git a/br/pkg/streamhelper/advancer_test.go b/br/pkg/streamhelper/advancer_test.go index 8921a1c4661b6..c9bcb2bff0582 100644 --- a/br/pkg/streamhelper/advancer_test.go +++ b/br/pkg/streamhelper/advancer_test.go @@ -215,7 +215,7 @@ func TestGCServiceSafePoint(t *testing.T) { req.Eventually(func() bool { env.fakeCluster.mu.Lock() defer env.fakeCluster.mu.Unlock() - return env.serviceGCSafePoint == 0 + return env.serviceGCSafePoint != 0 && env.serviceGCSafePointDeleted }, 3*time.Second, 100*time.Millisecond) } diff --git a/br/pkg/streamhelper/basic_lib_for_test.go b/br/pkg/streamhelper/basic_lib_for_test.go index 7b18f01f355de..f25a8021ca590 100644 --- a/br/pkg/streamhelper/basic_lib_for_test.go +++ b/br/pkg/streamhelper/basic_lib_for_test.go @@ -102,9 +102,10 @@ type fakeCluster struct { maxTs uint64 testCtx *testing.T - onGetClient func(uint64) error - serviceGCSafePoint uint64 - currentTS uint64 + onGetClient func(uint64) error + serviceGCSafePoint uint64 + serviceGCSafePointDeleted bool + currentTS uint64 } func (r *region) splitAt(newID uint64, k string) *region { @@ -265,10 +266,6 @@ func (f *fakeStore) GetLastFlushTSOfRegion(ctx context.Context, in *logbackup.Ge func (f *fakeCluster) BlockGCUntil(ctx context.Context, at uint64) (uint64, error) { f.mu.Lock() defer f.mu.Unlock() - if at == 0 { - f.serviceGCSafePoint = at - return at, nil - } if f.serviceGCSafePoint > at { return f.serviceGCSafePoint, nil } @@ -276,6 +273,13 @@ func (f *fakeCluster) BlockGCUntil(ctx context.Context, at uint64) (uint64, erro return at, nil } +func (f *fakeCluster) UnblockGC(ctx context.Context) error { + f.mu.Lock() + defer f.mu.Unlock() + f.serviceGCSafePointDeleted = true + return nil +} + func (f *fakeCluster) FetchCurrentTS(ctx context.Context) (uint64, error) { return f.currentTS, nil } diff --git a/br/pkg/streamhelper/regioniter.go b/br/pkg/streamhelper/regioniter.go index d6aa6f800f22a..b6b7062f01f23 100644 --- a/br/pkg/streamhelper/regioniter.go +++ b/br/pkg/streamhelper/regioniter.go @@ -43,6 +43,9 @@ type TiKVClusterMeta interface { // For now, all tasks (exactly one task in fact) use the same checkpoint. BlockGCUntil(ctx context.Context, at uint64) (uint64, error) + // UnblockGC used to remove the service GC safe point in PD. + UnblockGC(ctx context.Context) error + FetchCurrentTS(ctx context.Context) (uint64, error) } diff --git a/br/pkg/streamhelper/regioniter_test.go b/br/pkg/streamhelper/regioniter_test.go index 6ef65a5c86987..0166e30b5e6d5 100644 --- a/br/pkg/streamhelper/regioniter_test.go +++ b/br/pkg/streamhelper/regioniter_test.go @@ -83,6 +83,10 @@ func (c constantRegions) BlockGCUntil(ctx context.Context, at uint64) (uint64, e return 0, status.Error(codes.Unimplemented, "Unsupported operation") } +func (c constantRegions) UnblockGC(ctx context.Context) error { + return status.Error(codes.Unimplemented, "Unsupported operation") +} + // TODO: It should be able to synchoronize the current TS with the PD. func (c constantRegions) FetchCurrentTS(ctx context.Context) (uint64, error) { return oracle.ComposeTS(time.Now().UnixMilli(), 0), nil