Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
130026: storeliveness: intergrate with store and server r=nvanbenschoten a=miraradeva

This commit initializes the Store Liveness `Transport` upon starting the server, initializes the Store Liveness `SupportManager`, and sets up the store's `SupportManager` as a message handler in the node's `Transport`.

The `SupportManager` does not start heartbeating other stores until support from them is requested via `SupportFrom`.

Fixes: cockroachdb#125064

Release note: None

Co-authored-by: Mira Radeva <[email protected]>
  • Loading branch information
craig[bot] and miraradeva committed Sep 5, 2024
2 parents df4c2cd + b5db6e7 commit ae8d001
Show file tree
Hide file tree
Showing 13 changed files with 116 additions and 41 deletions.
8 changes: 8 additions & 0 deletions pkg/base/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -748,6 +748,14 @@ func (cfg RaftConfig) NodeLivenessDurations() (livenessActive, livenessRenewal t
return
}

// StoreLivenessDurations computes durations for store liveness heartbeat
// interval and liveness interval.
func (cfg RaftConfig) StoreLivenessDurations() (livenessInterval, heartbeatInterval time.Duration) {
livenessInterval = cfg.RangeLeaseDuration
heartbeatInterval = time.Duration(float64(livenessInterval) * livenessRenewalFraction)
return
}

// SentinelGossipTTL is time-to-live for the gossip sentinel. The sentinel
// informs a node whether or not it's connected to the primary gossip network
// and not just a partition. As such it must expire fairly quickly and be
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,7 @@ go_test(
"//pkg/kv/kvserver/spanset",
"//pkg/kv/kvserver/split",
"//pkg/kv/kvserver/stateloader",
"//pkg/kv/kvserver/storeliveness",
"//pkg/kv/kvserver/tenantrate",
"//pkg/kv/kvserver/tscache",
"//pkg/kv/kvserver/txnwait",
Expand Down
8 changes: 1 addition & 7 deletions pkg/kv/kvserver/replica_store_liveness.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"fmt"
"hash/fnv"

"github.com/cockroachdb/cockroach/pkg/clusterversion"
slpb "github.com/cockroachdb/cockroach/pkg/kv/kvserver/storeliveness/storelivenesspb"
"github.com/cockroachdb/cockroach/pkg/raft/raftpb"
"github.com/cockroachdb/cockroach/pkg/raft/raftstoreliveness"
Expand Down Expand Up @@ -94,12 +93,7 @@ func (r *replicaRLockedStoreLiveness) SupportFrom(

// SupportFromEnabled implements the raftstoreliveness.StoreLiveness interface.
func (r *replicaRLockedStoreLiveness) SupportFromEnabled() bool {
// TODO(mira): this version check is incorrect. For one, it doesn't belong
// here. Instead, the version should be checked when deciding to enable
// StoreLiveness or not. Then, the check here should only check whether store
// liveness is enabled.
storeLivenessEnabled := r.store.ClusterSettings().Version.IsActive(context.TODO(), clusterversion.V24_3_StoreLivenessEnabled)
if !storeLivenessEnabled {
if !r.store.storeLiveness.SupportFromEnabled(context.TODO()) {
return false
}
fracEnabled := raftLeaderFortificationFractionEnabled.Get(&r.store.ClusterSettings().SV)
Expand Down
38 changes: 25 additions & 13 deletions pkg/kv/kvserver/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/rangefeed"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/rditer"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/storeliveness"
slpb "github.com/cockroachdb/cockroach/pkg/kv/kvserver/storeliveness/storelivenesspb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/tenantrate"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/tscache"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/txnrecovery"
Expand Down Expand Up @@ -1154,17 +1155,18 @@ type StoreConfig struct {
AmbientCtx log.AmbientContext
base.RaftConfig

DefaultSpanConfig roachpb.SpanConfig
Settings *cluster.Settings
Clock *hlc.Clock
Gossip *gossip.Gossip
DB *kv.DB
NodeLiveness *liveness.NodeLiveness
StorePool *storepool.StorePool
Transport *RaftTransport
NodeDialer *nodedialer.Dialer
RPCContext *rpc.Context
RangeDescriptorCache *rangecache.RangeCache
DefaultSpanConfig roachpb.SpanConfig
Settings *cluster.Settings
Clock *hlc.Clock
Gossip *gossip.Gossip
DB *kv.DB
NodeLiveness *liveness.NodeLiveness
StorePool *storepool.StorePool
Transport *RaftTransport
StoreLivenessTransport *storeliveness.Transport
NodeDialer *nodedialer.Dialer
RPCContext *rpc.Context
RangeDescriptorCache *rangecache.RangeCache

ClosedTimestampSender *sidetransport.Sender
ClosedTimestampReceiver sidetransportReceiver
Expand Down Expand Up @@ -2174,8 +2176,18 @@ func (s *Store) Start(ctx context.Context, stopper *stop.Stopper) error {
)
s.metrics.registry.AddMetricStruct(s.recoveryMgr.Metrics())

// TODO(mira): create the store liveness support manager here.
// s.storeLiveness = ...
heartbeatInterval, livenessInterval := s.cfg.StoreLivenessDurations()
supportGracePeriod := s.cfg.RPCContext.StoreLivenessWithdrawalGracePeriod()
options := storeliveness.NewOptions(heartbeatInterval, livenessInterval, supportGracePeriod)
sm := storeliveness.NewSupportManager(
slpb.StoreIdent{NodeID: s.nodeDesc.NodeID}, s.StateEngine(), options,
s.cfg.Settings, s.stopper, s.cfg.Clock, s.cfg.StoreLivenessTransport,
)
s.cfg.StoreLivenessTransport.ListenMessages(s.StoreID(), sm)
s.storeLiveness = sm
if err = sm.Start(ctx); err != nil {
return errors.Wrap(err, "starting store liveness")
}

s.rangeIDAlloc = idAlloc

Expand Down
4 changes: 4 additions & 0 deletions pkg/kv/kvserver/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/logstore"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/rditer"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/stateloader"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/storeliveness"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/txnwait"
"github.com/cockroachdb/cockroach/pkg/multitenant/tenantcapabilities/tenantcapabilitiesauthorizer"
"github.com/cockroachdb/cockroach/pkg/raft"
Expand Down Expand Up @@ -248,6 +249,9 @@ func createTestStoreWithoutStart(
nil, /* PiggybackedAdmittedResponseScheduler */
nil, /* knobs */
)
cfg.StoreLivenessTransport = storeliveness.NewTransport(
cfg.AmbientCtx, stopper, cfg.Clock, cfg.NodeDialer, server,
)

stores := NewStores(cfg.AmbientCtx, cfg.Clock)
nodeDesc := &roachpb.NodeDescriptor{NodeID: 1}
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/storeliveness/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "storeliveness",
srcs = [
"config.go",
"fabric.go",
"persist.go",
"requester_state.go",
Expand Down
46 changes: 46 additions & 0 deletions pkg/kv/kvserver/storeliveness/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// Copyright 2024 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package storeliveness

import "time"

// Options includes all Store Liveness durations needed by the SupportManager.
type Options struct {
// HeartbeatInterval determines how often Store Liveness sends heartbeats.
HeartbeatInterval time.Duration
// LivenessInterval determines the Store Liveness support expiration time.
LivenessInterval time.Duration
// SupportExpiryInterval determines how often Store Liveness checks if support
// should be withdrawn.
SupportExpiryInterval time.Duration
// IdleSupportFromInterval determines how ofter Store Liveness checks if any
// stores have not appeared in a SupportFrom call recently.
IdleSupportFromInterval time.Duration
// SupportWithdrawalGracePeriod determines how long Store Liveness should
// wait after restart before withdrawing support. It helps prevent support
// churn until the first heartbeats are delivered.
SupportWithdrawalGracePeriod time.Duration
}

// NewOptions instantiates the Store Liveness Options.
func NewOptions(
heartbeatInterval time.Duration,
livenessInterval time.Duration,
supportWithdrawalGracePeriod time.Duration,
) Options {
return Options{
HeartbeatInterval: heartbeatInterval,
LivenessInterval: livenessInterval,
SupportExpiryInterval: 1 * time.Second,
IdleSupportFromInterval: 1 * time.Minute,
SupportWithdrawalGracePeriod: supportWithdrawalGracePeriod,
}
}
28 changes: 7 additions & 21 deletions pkg/kv/kvserver/storeliveness/support_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,23 +34,6 @@ var Enabled = settings.RegisterBoolSetting(
true,
)

type Options struct {
// HeartbeatInterval determines how often Store Liveness sends heartbeats.
HeartbeatInterval time.Duration
// LivenessInterval determines the Store Liveness support expiration time.
LivenessInterval time.Duration
// SupportExpiryInterval determines how often Store Liveness checks if support
// should be withdrawn.
SupportExpiryInterval time.Duration
// IdleSupportFromInterval determines how ofter Store Liveness checks if any
// stores have not appeared in a SupportFrom call recently.
IdleSupportFromInterval time.Duration
// SupportWithdrawalGracePeriod determines how long Store Liveness should
// wait after restart before withdrawing support. It helps prevent support
// churn until the first heartbeats are delivered.
SupportWithdrawalGracePeriod time.Duration
}

// MessageSender is the interface that defines how Store Liveness messages are
// sent. Transport is the production implementation of MessageSender.
type MessageSender interface {
Expand Down Expand Up @@ -131,6 +114,9 @@ func (sm *SupportManager) SupportFrom(id slpb.StoreIdent) (slpb.Epoch, hlc.Times
// uses a map to avoid duplicates, and the requesterStateHandler's
// addStore checks if the store exists before adding it.
sm.storesToAdd.addStore(id)
log.VInfof(context.Background(), 2,
"store %+v enqueued to add remote store %+v", sm.storeID, id,
)
return 0, hlc.Timestamp{}, false
}
// An empty expiration implies support has expired.
Expand Down Expand Up @@ -277,7 +263,7 @@ func (sm *SupportManager) sendHeartbeats(ctx context.Context) {
}
}
log.VInfof(
ctx, 2, "store %d sent heartbeats to %d stores", sm.storeID, len(heartbeats),
ctx, 2, "store %+v sent heartbeats to %d stores", sm.storeID, len(heartbeats),
)
}

Expand All @@ -294,7 +280,7 @@ func (sm *SupportManager) withdrawSupport(ctx context.Context) {
log.Warningf(ctx, "failed to write supporter meta: %v", err)
}
log.VInfof(
ctx, 2, "store %d withdrew support from %d stores",
ctx, 2, "store %+v withdrew support from %d stores",
sm.storeID, len(ssfu.inProgress.supportFor),
)
sm.supporterStateHandler.checkInUpdate(ssfu)
Expand All @@ -304,7 +290,7 @@ func (sm *SupportManager) withdrawSupport(ctx context.Context) {
// to either the requesterStateHandler or supporterStateHandler. It then writes
// all updates to disk in a single batch, and sends any responses via Transport.
func (sm *SupportManager) handleMessages(ctx context.Context, msgs []*slpb.Message) {
log.VInfof(ctx, 2, "store %d drained receive queue of size %d", sm.storeID, len(msgs))
log.VInfof(ctx, 2, "store %+v drained receive queue of size %d", sm.storeID, len(msgs))
rsfu := sm.requesterStateHandler.checkOutUpdate()
ssfu := sm.supporterStateHandler.checkOutUpdate()
var responses []slpb.Message
Expand Down Expand Up @@ -339,7 +325,7 @@ func (sm *SupportManager) handleMessages(ctx context.Context, msgs []*slpb.Messa
for _, response := range responses {
_ = sm.sender.SendAsync(response)
}
log.VInfof(ctx, 2, "store %d sent %d responses", sm.storeID, len(responses))
log.VInfof(ctx, 2, "store %+v sent %d responses", sm.storeID, len(responses))
}

// receiveQueue stores all received messages from the MessageHandler and allows
Expand Down
10 changes: 10 additions & 0 deletions pkg/rpc/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,16 @@ func (c *Context) SetLoopbackDialer(loopbackDialFn func(context.Context) (net.Co
c.loopbackDialFn = loopbackDialFn
}

// StoreLivenessGracePeriod computes the grace period after a store restarts before which it will
// not withdraw support from other stores.
func (c *Context) StoreLivenessWithdrawalGracePeriod() time.Duration {
// RPCHeartbeatInterval and RPCHeartbeatTimeout ensure the remote store
// probes the RPC connection to the local store. DialTimeout ensures the
// remote store has enough time to dial the local store, and NetworkTimeout
// ensures the remote store's heartbeat is received by the local store.
return c.RPCHeartbeatInterval + c.RPCHeartbeatTimeout + base.DialTimeout + base.NetworkTimeout
}

// ContextOptions are passed to NewContext to set up a new *Context.
// All pointer fields and TenantID are required.
type ContextOptions struct {
Expand Down
1 change: 1 addition & 0 deletions pkg/server/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ go_library(
"//pkg/kv/kvserver/rangefeed",
"//pkg/kv/kvserver/rangelog",
"//pkg/kv/kvserver/reports",
"//pkg/kv/kvserver/storeliveness",
"//pkg/multitenant",
"//pkg/multitenant/mtinfopb",
"//pkg/multitenant/multitenantcpu",
Expand Down
6 changes: 6 additions & 0 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ import (
serverrangefeed "github.com/cockroachdb/cockroach/pkg/kv/kvserver/rangefeed"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/rangelog"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/reports"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/storeliveness"
"github.com/cockroachdb/cockroach/pkg/multitenant/tenantcapabilities"
"github.com/cockroachdb/cockroach/pkg/multitenant/tenantcapabilities/tenantcapabilitiesauthorizer"
"github.com/cockroachdb/cockroach/pkg/multitenant/tenantcapabilities/tenantcapabilitieswatcher"
Expand Down Expand Up @@ -648,6 +649,10 @@ func NewServer(cfg Config, stopper *stop.Stopper) (serverctl.ServerStartupInterf
)
nodeRegistry.AddMetricStruct(raftTransport.Metrics())

storeLivenessTransport := storeliveness.NewTransport(
cfg.AmbientCtx, stopper, clock, kvNodeDialer, grpcServer.Server,
)

ctSender := sidetransport.NewSender(stopper, st, clock, kvNodeDialer)
ctReceiver := sidetransport.NewReceiver(nodeIDContainer, stopper, stores, nil /* testingKnobs */)

Expand Down Expand Up @@ -854,6 +859,7 @@ func NewServer(cfg Config, stopper *stop.Stopper) (serverctl.ServerStartupInterf
Gossip: g,
NodeLiveness: nodeLiveness,
Transport: raftTransport,
StoreLivenessTransport: storeLivenessTransport,
NodeDialer: kvNodeDialer,
RPCContext: rpcContext,
ScanInterval: cfg.ScanInterval,
Expand Down
1 change: 1 addition & 0 deletions pkg/testutils/localtestcluster/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ go_library(
"//pkg/kv/kvserver/closedts/sidetransport",
"//pkg/kv/kvserver/kvstorage",
"//pkg/kv/kvserver/liveness",
"//pkg/kv/kvserver/storeliveness",
"//pkg/multitenant/tenantcapabilities/tenantcapabilitiesauthorizer",
"//pkg/roachpb",
"//pkg/rpc",
Expand Down
5 changes: 5 additions & 0 deletions pkg/testutils/localtestcluster/local_test_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts/sidetransport"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvstorage"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/storeliveness"
"github.com/cockroachdb/cockroach/pkg/multitenant/tenantcapabilities/tenantcapabilitiesauthorizer"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/rpc"
Expand Down Expand Up @@ -183,6 +184,9 @@ func (ltc *LocalTestCluster) Start(t testing.TB, initFactory InitFactoryFn) {
}
ltc.DB = kv.NewDBWithContext(cfg.AmbientCtx, factory, ltc.Clock, *ltc.dbContext)
transport := kvserver.NewDummyRaftTransport(cfg.AmbientCtx, cfg.Settings, ltc.Clock)
storeLivenessTransport := storeliveness.NewTransport(
cfg.AmbientCtx, ltc.stopper, ltc.Clock, nil, nil,
)
// By default, disable the replica scanner and split queue, which
// confuse tests using LocalTestCluster.
if ltc.StoreTestingKnobs == nil {
Expand Down Expand Up @@ -226,6 +230,7 @@ func (ltc *LocalTestCluster) Start(t testing.TB, initFactory InitFactoryFn) {
/* deterministic */ false,
)
cfg.Transport = transport
cfg.StoreLivenessTransport = storeLivenessTransport
cfg.ClosedTimestampReceiver = sidetransport.NewReceiver(nc, ltc.stopper, ltc.Stores, nil /* testingKnobs */)

if err := kvstorage.WriteClusterVersion(ctx, ltc.Eng, clusterversion.TestingClusterVersion); err != nil {
Expand Down

0 comments on commit ae8d001

Please sign in to comment.