Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
130025: raft: introduce and make use of SupportTracker  r=nvanbenschoten a=arulajmani

This patch adds a new type of tracker called a SupportTracker. Then,
leaders make use of this tracker to record fortification support from
followers on successful MsgFortifyLeaderResp messages.

Closes cockroachdb#125264

Release note: None

Co-authored-by: Arul Ajmani <[email protected]>
  • Loading branch information
craig[bot] and arulajmani committed Sep 6, 2024
2 parents 870c5fb + d498ae3 commit babaab7
Show file tree
Hide file tree
Showing 14 changed files with 658 additions and 36 deletions.
17 changes: 10 additions & 7 deletions pkg/raft/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,9 +162,10 @@ func TestNodePropose(t *testing.T) {
n.Propose(context.TODO(), []byte("somedata"))
n.Stop()

require.Len(t, msgs, 1)
assert.Equal(t, raftpb.MsgProp, msgs[0].Type)
assert.Equal(t, []byte("somedata"), msgs[0].Entries[0].Data)
require.Len(t, msgs, 2)
assert.Equal(t, raftpb.MsgFortifyLeaderResp, msgs[0].Type)
assert.Equal(t, raftpb.MsgProp, msgs[1].Type)
assert.Equal(t, []byte("somedata"), msgs[1].Entries[0].Data)
}

// TestDisableProposalForwarding ensures that proposals are not forwarded to
Expand Down Expand Up @@ -230,9 +231,10 @@ func TestNodeProposeConfig(t *testing.T) {
n.ProposeConfChange(context.TODO(), cc)
n.Stop()

require.Len(t, msgs, 1)
assert.Equal(t, raftpb.MsgProp, msgs[0].Type)
assert.Equal(t, ccdata, msgs[0].Entries[0].Data)
require.Len(t, msgs, 2)
assert.Equal(t, raftpb.MsgFortifyLeaderResp, msgs[0].Type)
assert.Equal(t, raftpb.MsgProp, msgs[1].Type)
assert.Equal(t, ccdata, msgs[1].Entries[0].Data)
}

// TestNodeProposeAddDuplicateNode ensures that two proposes to add the same node should
Expand Down Expand Up @@ -378,7 +380,8 @@ func TestNodeProposeWaitDropped(t *testing.T) {
assert.Equal(t, ErrProposalDropped, n.Propose(context.Background(), droppingMsg))

n.Stop()
require.Empty(t, msgs)
require.Len(t, msgs, 1)
assert.Equal(t, raftpb.MsgFortifyLeaderResp, msgs[0].Type)
}

// TestNodeTick ensures that node.Tick() will increase the
Expand Down
36 changes: 18 additions & 18 deletions pkg/raft/quorum/quorum_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,37 +35,37 @@ func TestLeadSupportExpiration(t *testing.T) {
testCases := []struct {
ids []pb.PeerID
support map[pb.PeerID]hlc.Timestamp
expQSE hlc.Timestamp
exp hlc.Timestamp
}{
{
ids: []pb.PeerID{1, 2, 3},
support: map[pb.PeerID]hlc.Timestamp{1: ts(10), 2: ts(20), 3: ts(15)},
expQSE: ts(15),
exp: ts(15),
},
{
ids: []pb.PeerID{1, 2, 3, 4},
support: map[pb.PeerID]hlc.Timestamp{1: ts(10), 2: ts(20), 3: ts(15), 4: ts(20)},
expQSE: ts(15),
exp: ts(15),
},
{
ids: []pb.PeerID{1, 2, 3, 4, 5},
support: map[pb.PeerID]hlc.Timestamp{1: ts(10), 2: ts(20), 3: ts(15), 4: ts(20), 5: ts(20)},
expQSE: ts(20),
exp: ts(20),
},
{
ids: []pb.PeerID{1, 2, 3},
support: map[pb.PeerID]hlc.Timestamp{1: ts(10), 2: ts(20)},
expQSE: ts(10),
exp: ts(10),
},
{
ids: []pb.PeerID{1, 2, 3},
support: map[pb.PeerID]hlc.Timestamp{1: ts(10)},
expQSE: hlc.Timestamp{},
exp: hlc.Timestamp{},
},
{
ids: []pb.PeerID{},
support: map[pb.PeerID]hlc.Timestamp{},
expQSE: hlc.MaxTimestamp,
exp: hlc.MaxTimestamp,
},
}

Expand All @@ -75,14 +75,14 @@ func TestLeadSupportExpiration(t *testing.T) {
m[id] = struct{}{}
}

require.Equal(t, tc.expQSE, m.LeadSupportExpiration(tc.support))
require.Equal(t, tc.exp, m.LeadSupportExpiration(tc.support))
}
}

// TestComputeQSEJointConfig ensures that the QSE is calculated correctly for
// joint configurations. In particular, it's the minimum of the two majority
// configs.
func TestComputeQSEJointConfig(t *testing.T) {
// TestLeadSupportExpirationJointConfig ensures that the LeadSupportExpiration
// is calculated correctly for joint configurations. In particular, it's the
// minimum of the two majority configs.
func TestLeadSupportExpirationJointConfig(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

Expand All @@ -96,31 +96,31 @@ func TestComputeQSEJointConfig(t *testing.T) {
cfg1 []pb.PeerID
cfg2 []pb.PeerID
support map[pb.PeerID]hlc.Timestamp
expQSE hlc.Timestamp
exp hlc.Timestamp
}{
{
cfg1: []pb.PeerID{1, 2, 3},
cfg2: []pb.PeerID{},
support: map[pb.PeerID]hlc.Timestamp{1: ts(10), 2: ts(20), 3: ts(15)},
expQSE: ts(15), // cfg2 is empty, should behave like the (cfg1) majority config case
exp: ts(15), // cfg2 is empty, should behave like the (cfg1) majority config case
},
{
cfg1: []pb.PeerID{},
cfg2: []pb.PeerID{1, 2, 3},
support: map[pb.PeerID]hlc.Timestamp{1: ts(10), 2: ts(20), 3: ts(15)},
expQSE: ts(15), // cfg1 is empty, should behave like the (cfg2) majority config case
exp: ts(15), // cfg1 is empty, should behave like the (cfg2) majority config case
},
{
cfg1: []pb.PeerID{3, 4, 5},
cfg2: []pb.PeerID{1, 2, 3},
support: map[pb.PeerID]hlc.Timestamp{1: ts(10), 2: ts(20), 3: ts(15), 4: ts(20), 5: ts(25)},
expQSE: ts(15), // lower of the two
exp: ts(15), // lower of the two
},
{
cfg1: []pb.PeerID{3, 4, 5},
cfg2: []pb.PeerID{1, 2, 3},
support: map[pb.PeerID]hlc.Timestamp{1: ts(10), 2: ts(20), 3: ts(15), 4: ts(10), 5: ts(10)},
expQSE: ts(10), // lower of the two; this time, cfg2 has the lower qse
exp: ts(10), // lower of the two; this time, cfg2 has the lower expiration
},
}

Expand All @@ -136,6 +136,6 @@ func TestComputeQSEJointConfig(t *testing.T) {
j[1][id] = struct{}{}
}

require.Equal(t, tc.expQSE, j.LeadSupportExpiration(tc.support))
require.Equal(t, tc.exp, j.LeadSupportExpiration(tc.support))
}
}
25 changes: 20 additions & 5 deletions pkg/raft/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,7 @@ type raft struct {
config quorum.Config
trk tracker.ProgressTracker
electionTracker tracker.ElectionTracker
supportTracker tracker.SupportTracker

state StateType

Expand Down Expand Up @@ -469,7 +470,8 @@ func newRaft(c *Config) *raft {
}
lastID := r.raftLog.lastEntryID()

r.electionTracker = tracker.MakeVoteTracker(&r.config)
r.electionTracker = tracker.MakeElectionTracker(&r.config)
r.supportTracker = tracker.MakeSupportTracker(&r.config, r.storeLiveness)

cfg, progressMap, err := confchange.Restore(confchange.Changer{
Config: quorum.MakeEmptyConfig(),
Expand Down Expand Up @@ -726,8 +728,14 @@ func (r *raft) sendFortify(to pb.PeerID) {
epoch, live := r.storeLiveness.SupportFor(r.lead)
if live {
r.leadEpoch = epoch
// TODO(arul): For now, we're not recording any support on the leader. Do
// this once we implement handleFortifyResp correctly.
// The leader needs to persist the LeadEpoch durably before it can start
// supporting itself. We do so by sending a self-addressed
// MsgFortifyLeaderResp message so that it is added to the msgsAfterAppend
// slice and delivered back to this node only after LeadEpoch is
// persisted. At that point, this node can record support without
// discrimination for who is providing support (itself vs. other
// follower).
r.send(pb.Message{To: r.id, Type: pb.MsgFortifyLeaderResp, LeadEpoch: epoch})
} else {
r.logger.Infof(
"%x leader at term %d does not support itself in the liveness fabric", r.id, r.Term,
Expand Down Expand Up @@ -858,6 +866,7 @@ func (r *raft) reset(term uint64) {
r.abortLeaderTransfer()

r.electionTracker.ResetVotes()
r.supportTracker.Reset()
r.trk.Visit(func(id pb.PeerID, pr *tracker.Progress) {
*pr = tracker.Progress{
Match: 0,
Expand Down Expand Up @@ -2096,8 +2105,14 @@ func (r *raft) handleFortify(m pb.Message) {

func (r *raft) handleFortifyResp(m pb.Message) {
assertTrue(r.state == StateLeader, "only leaders should be handling fortification responses")
// TODO(arul): record support once
// https://github.com/cockroachdb/cockroach/issues/125264 lands.
if m.Reject {
// Couldn't successfully fortify the follower. Typically, this happens when
// the follower isn't supporting the leader's store in StoreLiveness or the
// follower is down. We'll try to fortify the follower again later in
// tickHeartbeat.
return
}
r.supportTracker.RecordSupport(m.From, m.LeadEpoch)
}

// deFortify (conceptually) revokes previously provided fortification support to
Expand Down
11 changes: 11 additions & 0 deletions pkg/raft/rafttest/interaction_env_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,17 @@ func (env *InteractionEnv) Handle(t *testing.T, d datadriven.TestData) string {
// Explanation:
// 1 (from_store) grants support for 2 (for_store) at a higher epoch.
err = env.handleGrantSupport(t, d)
case "print-support-state":
// Prints the support state being tracked by a raft leader. Empty on a
// follower.
//
// print-support-state id
// Arguments are:
// id - id of the raft peer whose support map to print.
//
// Example:
// print-support-state 1
err = env.handlePrintSupportState(t, d)

default:
err = fmt.Errorf("unknown command")
Expand Down
10 changes: 10 additions & 0 deletions pkg/raft/rafttest/interaction_env_handler_raftstate.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@ package rafttest

import (
"fmt"
"testing"

"github.com/cockroachdb/cockroach/pkg/raft"
"github.com/cockroachdb/cockroach/pkg/raft/raftpb"
"github.com/cockroachdb/datadriven"
)

// isVoter checks whether node id is in the voter list within st.
Expand Down Expand Up @@ -51,3 +53,11 @@ func (env *InteractionEnv) handleRaftState() error {
}
return nil
}

// handlePrintSupportState pretty-prints the support map being tracked by a raft
// peer.
func (env *InteractionEnv) handlePrintSupportState(t *testing.T, d datadriven.TestData) error {
idx := firstAsNodeIdx(t, d)
fmt.Fprint(env.Output, env.Nodes[idx].TestingSupportStateString())
return nil
}
7 changes: 4 additions & 3 deletions pkg/raft/rawnode.go
Original file line number Diff line number Diff line change
Expand Up @@ -499,9 +499,6 @@ func (rn *RawNode) LeadSupportStatus() LeadSupportStatus {
return getLeadSupportStatus(rn.raft)
}

// TODO(nvanbenschoten): remove this one the method is used.
var _ = (*RawNode).LeadSupportStatus

// ProgressType indicates the type of replica a Progress corresponds to.
type ProgressType byte

Expand Down Expand Up @@ -544,3 +541,7 @@ func (rn *RawNode) ForgetLeader() error {
func (rn *RawNode) TestingStepDown() error {
return rn.raft.testingStepDown()
}

func (rn *RawNode) TestingSupportStateString() string {
return rn.raft.supportTracker.String()
}
4 changes: 2 additions & 2 deletions pkg/raft/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ func getStatus(r *raft) Status {
// NOTE: we assign to LeadSupportUntil even if RaftState is not currently
// StateLeader. The replica may have been the leader and stepped down to a
// follower before its lead support ran out.
s.LeadSupportUntil = hlc.Timestamp{} // TODO(arul): populate this field
s.LeadSupportUntil = r.supportTracker.LeadSupportUntil()
return s
}

Expand All @@ -155,7 +155,7 @@ func getSparseStatus(r *raft) SparseStatus {
func getLeadSupportStatus(r *raft) LeadSupportStatus {
var s LeadSupportStatus
s.BasicStatus = getBasicStatus(r)
s.LeadSupportUntil = hlc.Timestamp{} // TODO(arul): populate this field
s.LeadSupportUntil = r.supportTracker.LeadSupportUntil()
return s
}

Expand Down
3 changes: 3 additions & 0 deletions pkg/raft/testdata/async_storage_writes.txt
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ stabilize
1->3 MsgApp Term:1 Log:1/10 Commit:10 Entries:[1/11 EntryNormal ""]
1->AppendThread MsgStorageAppend Term:1 Log:1/11 Commit:10 Vote:1 Lead:1 LeadEpoch:1 Entries:[1/11 EntryNormal ""] Responses:[
1->1 MsgAppResp Term:1 Log:0/11 Commit:10
1->1 MsgFortifyLeaderResp Term:1 Log:0/0 LeadEpoch:1
AppendThread->1 MsgStorageAppendResp Term:0 Log:1/11
]
> 2 receiving messages
Expand All @@ -107,6 +108,7 @@ stabilize
1->AppendThread MsgStorageAppend Term:1 Log:1/11 Commit:10 Vote:1 Lead:1 LeadEpoch:1 Entries:[1/11 EntryNormal ""]
Responses:
1->1 MsgAppResp Term:1 Log:0/11 Commit:10
1->1 MsgFortifyLeaderResp Term:1 Log:0/0 LeadEpoch:1
AppendThread->1 MsgStorageAppendResp Term:0 Log:1/11
> 2 handling Ready
Ready MustSync=true:
Expand All @@ -132,6 +134,7 @@ stabilize
]
> 1 receiving messages
1->1 MsgAppResp Term:1 Log:0/11 Commit:10
1->1 MsgFortifyLeaderResp Term:1 Log:0/0 LeadEpoch:1
AppendThread->1 MsgStorageAppendResp Term:0 Log:1/11
> 2 processing append thread
Processing:
Expand Down
2 changes: 2 additions & 0 deletions pkg/raft/testdata/async_storage_writes_append_aba_race.txt
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ Messages:
3->7 MsgApp Term:2 Log:1/11 Commit:11 Entries:[2/12 EntryNormal ""]
3->AppendThread MsgStorageAppend Term:2 Log:2/12 Commit:11 Vote:3 Lead:3 LeadEpoch:1 Entries:[2/12 EntryNormal ""] Responses:[
3->3 MsgAppResp Term:2 Log:0/12 Commit:11
3->3 MsgFortifyLeaderResp Term:2 Log:0/0 LeadEpoch:1
AppendThread->3 MsgStorageAppendResp Term:0 Log:2/12
]

Expand Down Expand Up @@ -383,6 +384,7 @@ Messages:
4->7 MsgApp Term:3 Log:1/11 Commit:11 Entries:[3/12 EntryNormal ""]
4->AppendThread MsgStorageAppend Term:3 Log:3/12 Commit:11 Vote:4 Lead:4 LeadEpoch:1 Entries:[3/12 EntryNormal ""] Responses:[
4->4 MsgAppResp Term:3 Log:0/12 Commit:11
4->4 MsgFortifyLeaderResp Term:3 Log:0/0 LeadEpoch:1
AppendThread->4 MsgStorageAppendResp Term:0 Log:3/12
]

Expand Down
Loading

0 comments on commit babaab7

Please sign in to comment.