diff --git a/pkg/kv/kvserver/liveness/client_test.go b/pkg/kv/kvserver/liveness/client_test.go index 114de6bb2bc2..c4084d44208c 100644 --- a/pkg/kv/kvserver/liveness/client_test.go +++ b/pkg/kv/kvserver/liveness/client_test.go @@ -373,3 +373,52 @@ func TestGetActiveNodes(t *testing.T) { }) require.Equal(t, []roachpb.NodeID{1, 2, 3, 4}, getActiveNodes(nl1)) } + +// TestLivenessRangeGetsPeriodicallyCompacted tests that the liveness range +// gets compacted when we set the liveness range compaction interval. +func TestLivenessRangeGetsPeriodicallyCompacted(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + + tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{}) + defer tc.Stopper().Stop(ctx) + + // Enable the liveness range compaction and set the interval to 1s to speed + // up the test. + c := tc.Server(0).SystemLayer().SQLConn(t) + _, err := c.ExecContext(ctx, "set cluster setting kv.liveness_range_compact.interval='1s'") + require.NoError(t, err) + + // Get the original file number of the sstable for the liveness range. We + // expect to see this file number change as the liveness range gets compacted. + livenessFileNumberQuery := "WITH replicas(n) AS (SELECT unnest(replicas) FROM " + + "crdb_internal.ranges_no_leases WHERE range_id = 2), sstables AS (SELECT " + + "(crdb_internal.sstable_metrics(n, n, start_key, end_key)).* " + + "FROM crdb_internal.ranges_no_leases, replicas WHERE range_id = 2) " + + "SELECT file_num FROM sstables" + + sqlDB := tc.ApplicationLayer(0).SQLConn(t) + var original_file_num string + testutils.SucceedsSoon(t, func() error { + rows := sqlDB.QueryRow(livenessFileNumberQuery) + if err := rows.Scan(&original_file_num); err != nil { + return err + } + return nil + }) + + // Expect that the liveness file number changes. + testutils.SucceedsSoon(t, func() error { + var current_file_num string + rows := sqlDB.QueryRow(livenessFileNumberQuery) + if err := rows.Scan(¤t_file_num); err != nil { + return err + } + if current_file_num == original_file_num { + return errors.Errorf("Liveness compaction hasn't happened yet") + } + return nil + }) +} diff --git a/pkg/server/node.go b/pkg/server/node.go index 5483796f7133..da6e740f15c7 100644 --- a/pkg/server/node.go +++ b/pkg/server/node.go @@ -14,6 +14,7 @@ import ( "bytes" "context" "fmt" + "math" "net" "sort" "strings" @@ -241,6 +242,13 @@ var ( `duration spent in processing above any available stack history is appended to its trace, if automatic trace snapshots are enabled`, time.Second*30, ) + + livenessRangeCompactInterval = settings.RegisterDurationSetting( + settings.SystemOnly, + "kv.liveness_range_compact.interval", + `interval at which the liveness range is compacted. A value of 0 disables the periodic compaction`, + 0, + ) ) // By default, stores will be started concurrently. @@ -810,6 +818,8 @@ func (n *Node) start( log.Infof(ctx, "started with engine type %v", &t) } log.Infof(ctx, "started with attributes %v", attrs.Attrs) + + n.startPeriodicLivenessCompaction(n.stopper, livenessRangeCompactInterval) return nil } @@ -1101,6 +1111,87 @@ func (n *Node) startComputePeriodicMetrics(stopper *stop.Stopper, interval time. }) } +// startPeriodicLivenessCompaction starts a loop where it periodically compacts +// the liveness range. +func (n *Node) startPeriodicLivenessCompaction( + stopper *stop.Stopper, livenessRangeCompactInterval *settings.DurationSetting, +) { + ctx := n.AnnotateCtx(context.Background()) + + // getCompactionInterval() returns the interval at which the liveness range is + // set to be compacted. If the interval is set to 0, the period is set to the + // max possible duration because a value of 0 cause the ticker to panic. + getCompactionInterval := func() time.Duration { + interval := livenessRangeCompactInterval.Get(&n.storeCfg.Settings.SV) + if interval == 0 { + interval = math.MaxInt64 + } + return interval + } + + if err := stopper.RunAsyncTask(ctx, "liveness-compaction", func(ctx context.Context) { + interval := getCompactionInterval() + ticker := time.NewTicker(interval) + + intervalChangeChan := make(chan time.Duration) + + // Update the compaction interval when the setting changes. + livenessRangeCompactInterval.SetOnChange(&n.storeCfg.Settings.SV, func(ctx context.Context) { + // intervalChangeChan is used to signal the compaction loop that the + // interval has changed. Avoid blocking the main goroutine that is + // responsible for handling all settings updates. + select { + case intervalChangeChan <- getCompactionInterval(): + default: + } + }) + + defer ticker.Stop() + for { + select { + case <-ticker.C: + // Find the liveness replica in order to compact it. + _ = n.stores.VisitStores(func(store *kvserver.Store) error { + store.VisitReplicas(func(repl *kvserver.Replica) bool { + span := repl.Desc().KeySpan().AsRawSpanWithNoLocals() + if keys.NodeLivenessSpan.Overlaps(span) { + + // The CompactRange() method expects the start and end keys to be + // encoded. + startEngineKey := + storage.EngineKey{ + Key: span.Key, + }.Encode() + + endEngineKey := + storage.EngineKey{ + Key: span.EndKey, + }.Encode() + + timeBeforeCompaction := timeutil.Now() + if err := store.StateEngine().CompactRange(startEngineKey, endEngineKey); err != nil { + log.Errorf(ctx, "failed compacting liveness replica: %+v with error: %s", repl, err) + } + + log.Infof(ctx, "finished compacting liveness replica: %+v and it took: %+v", + repl, timeutil.Since(timeBeforeCompaction)) + } + return true + }) + return nil + }) + case newInterval := <-intervalChangeChan: + ticker.Reset(newInterval) + case <-stopper.ShouldQuiesce(): + return + } + } + }); err != nil { + log.Errorf(ctx, "failed to start the async liveness compaction task") + } + +} + // updateNodeRangeCount updates the internal counter of the total ranges across // all stores. This value is used to make a decision on whether the node should // use expiration leases (see Replica.shouldUseExpirationLeaseRLocked).