diff --git a/pkg/raft/node_test.go b/pkg/raft/node_test.go index f67279e64a5f..7fc88435f193 100644 --- a/pkg/raft/node_test.go +++ b/pkg/raft/node_test.go @@ -162,9 +162,10 @@ func TestNodePropose(t *testing.T) { n.Propose(context.TODO(), []byte("somedata")) n.Stop() - require.Len(t, msgs, 1) - assert.Equal(t, raftpb.MsgProp, msgs[0].Type) - assert.Equal(t, []byte("somedata"), msgs[0].Entries[0].Data) + require.Len(t, msgs, 2) + assert.Equal(t, raftpb.MsgFortifyLeaderResp, msgs[0].Type) + assert.Equal(t, raftpb.MsgProp, msgs[1].Type) + assert.Equal(t, []byte("somedata"), msgs[1].Entries[0].Data) } // TestDisableProposalForwarding ensures that proposals are not forwarded to @@ -230,9 +231,10 @@ func TestNodeProposeConfig(t *testing.T) { n.ProposeConfChange(context.TODO(), cc) n.Stop() - require.Len(t, msgs, 1) - assert.Equal(t, raftpb.MsgProp, msgs[0].Type) - assert.Equal(t, ccdata, msgs[0].Entries[0].Data) + require.Len(t, msgs, 2) + assert.Equal(t, raftpb.MsgFortifyLeaderResp, msgs[0].Type) + assert.Equal(t, raftpb.MsgProp, msgs[1].Type) + assert.Equal(t, ccdata, msgs[1].Entries[0].Data) } // TestNodeProposeAddDuplicateNode ensures that two proposes to add the same node should @@ -378,7 +380,8 @@ func TestNodeProposeWaitDropped(t *testing.T) { assert.Equal(t, ErrProposalDropped, n.Propose(context.Background(), droppingMsg)) n.Stop() - require.Empty(t, msgs) + require.Len(t, msgs, 1) + assert.Equal(t, raftpb.MsgFortifyLeaderResp, msgs[0].Type) } // TestNodeTick ensures that node.Tick() will increase the diff --git a/pkg/raft/quorum/quorum_test.go b/pkg/raft/quorum/quorum_test.go index bec04815a9fa..cd88dcab4e98 100644 --- a/pkg/raft/quorum/quorum_test.go +++ b/pkg/raft/quorum/quorum_test.go @@ -35,37 +35,37 @@ func TestLeadSupportExpiration(t *testing.T) { testCases := []struct { ids []pb.PeerID support map[pb.PeerID]hlc.Timestamp - expQSE hlc.Timestamp + exp hlc.Timestamp }{ { ids: []pb.PeerID{1, 2, 3}, support: map[pb.PeerID]hlc.Timestamp{1: ts(10), 2: ts(20), 3: ts(15)}, - expQSE: ts(15), + exp: ts(15), }, { ids: []pb.PeerID{1, 2, 3, 4}, support: map[pb.PeerID]hlc.Timestamp{1: ts(10), 2: ts(20), 3: ts(15), 4: ts(20)}, - expQSE: ts(15), + exp: ts(15), }, { ids: []pb.PeerID{1, 2, 3, 4, 5}, support: map[pb.PeerID]hlc.Timestamp{1: ts(10), 2: ts(20), 3: ts(15), 4: ts(20), 5: ts(20)}, - expQSE: ts(20), + exp: ts(20), }, { ids: []pb.PeerID{1, 2, 3}, support: map[pb.PeerID]hlc.Timestamp{1: ts(10), 2: ts(20)}, - expQSE: ts(10), + exp: ts(10), }, { ids: []pb.PeerID{1, 2, 3}, support: map[pb.PeerID]hlc.Timestamp{1: ts(10)}, - expQSE: hlc.Timestamp{}, + exp: hlc.Timestamp{}, }, { ids: []pb.PeerID{}, support: map[pb.PeerID]hlc.Timestamp{}, - expQSE: hlc.MaxTimestamp, + exp: hlc.MaxTimestamp, }, } @@ -75,14 +75,14 @@ func TestLeadSupportExpiration(t *testing.T) { m[id] = struct{}{} } - require.Equal(t, tc.expQSE, m.LeadSupportExpiration(tc.support)) + require.Equal(t, tc.exp, m.LeadSupportExpiration(tc.support)) } } -// TestComputeQSEJointConfig ensures that the QSE is calculated correctly for -// joint configurations. In particular, it's the minimum of the two majority -// configs. -func TestComputeQSEJointConfig(t *testing.T) { +// TestLeadSupportExpirationJointConfig ensures that the LeadSupportExpiration +// is calculated correctly for joint configurations. In particular, it's the +// minimum of the two majority configs. +func TestLeadSupportExpirationJointConfig(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -96,31 +96,31 @@ func TestComputeQSEJointConfig(t *testing.T) { cfg1 []pb.PeerID cfg2 []pb.PeerID support map[pb.PeerID]hlc.Timestamp - expQSE hlc.Timestamp + exp hlc.Timestamp }{ { cfg1: []pb.PeerID{1, 2, 3}, cfg2: []pb.PeerID{}, support: map[pb.PeerID]hlc.Timestamp{1: ts(10), 2: ts(20), 3: ts(15)}, - expQSE: ts(15), // cfg2 is empty, should behave like the (cfg1) majority config case + exp: ts(15), // cfg2 is empty, should behave like the (cfg1) majority config case }, { cfg1: []pb.PeerID{}, cfg2: []pb.PeerID{1, 2, 3}, support: map[pb.PeerID]hlc.Timestamp{1: ts(10), 2: ts(20), 3: ts(15)}, - expQSE: ts(15), // cfg1 is empty, should behave like the (cfg2) majority config case + exp: ts(15), // cfg1 is empty, should behave like the (cfg2) majority config case }, { cfg1: []pb.PeerID{3, 4, 5}, cfg2: []pb.PeerID{1, 2, 3}, support: map[pb.PeerID]hlc.Timestamp{1: ts(10), 2: ts(20), 3: ts(15), 4: ts(20), 5: ts(25)}, - expQSE: ts(15), // lower of the two + exp: ts(15), // lower of the two }, { cfg1: []pb.PeerID{3, 4, 5}, cfg2: []pb.PeerID{1, 2, 3}, support: map[pb.PeerID]hlc.Timestamp{1: ts(10), 2: ts(20), 3: ts(15), 4: ts(10), 5: ts(10)}, - expQSE: ts(10), // lower of the two; this time, cfg2 has the lower qse + exp: ts(10), // lower of the two; this time, cfg2 has the lower expiration }, } @@ -136,6 +136,6 @@ func TestComputeQSEJointConfig(t *testing.T) { j[1][id] = struct{}{} } - require.Equal(t, tc.expQSE, j.LeadSupportExpiration(tc.support)) + require.Equal(t, tc.exp, j.LeadSupportExpiration(tc.support)) } } diff --git a/pkg/raft/raft.go b/pkg/raft/raft.go index 535072a5dafa..0a361724a162 100644 --- a/pkg/raft/raft.go +++ b/pkg/raft/raft.go @@ -336,6 +336,7 @@ type raft struct { config quorum.Config trk tracker.ProgressTracker electionTracker tracker.ElectionTracker + supportTracker tracker.SupportTracker state StateType @@ -469,7 +470,8 @@ func newRaft(c *Config) *raft { } lastID := r.raftLog.lastEntryID() - r.electionTracker = tracker.MakeVoteTracker(&r.config) + r.electionTracker = tracker.MakeElectionTracker(&r.config) + r.supportTracker = tracker.MakeSupportTracker(&r.config, r.storeLiveness) cfg, progressMap, err := confchange.Restore(confchange.Changer{ Config: quorum.MakeEmptyConfig(), @@ -726,8 +728,14 @@ func (r *raft) sendFortify(to pb.PeerID) { epoch, live := r.storeLiveness.SupportFor(r.lead) if live { r.leadEpoch = epoch - // TODO(arul): For now, we're not recording any support on the leader. Do - // this once we implement handleFortifyResp correctly. + // The leader needs to persist the LeadEpoch durably before it can start + // supporting itself. We do so by sending a self-addressed + // MsgFortifyLeaderResp message so that it is added to the msgsAfterAppend + // slice and delivered back to this node only after LeadEpoch is + // persisted. At that point, this node can record support without + // discrimination for who is providing support (itself vs. other + // follower). + r.send(pb.Message{To: r.id, Type: pb.MsgFortifyLeaderResp, LeadEpoch: epoch}) } else { r.logger.Infof( "%x leader at term %d does not support itself in the liveness fabric", r.id, r.Term, @@ -858,6 +866,7 @@ func (r *raft) reset(term uint64) { r.abortLeaderTransfer() r.electionTracker.ResetVotes() + r.supportTracker.Reset() r.trk.Visit(func(id pb.PeerID, pr *tracker.Progress) { *pr = tracker.Progress{ Match: 0, @@ -2096,8 +2105,14 @@ func (r *raft) handleFortify(m pb.Message) { func (r *raft) handleFortifyResp(m pb.Message) { assertTrue(r.state == StateLeader, "only leaders should be handling fortification responses") - // TODO(arul): record support once - // https://github.com/cockroachdb/cockroach/issues/125264 lands. + if m.Reject { + // Couldn't successfully fortify the follower. Typically, this happens when + // the follower isn't supporting the leader's store in StoreLiveness or the + // follower is down. We'll try to fortify the follower again later in + // tickHeartbeat. + return + } + r.supportTracker.RecordSupport(m.From, m.LeadEpoch) } // deFortify (conceptually) revokes previously provided fortification support to diff --git a/pkg/raft/rafttest/interaction_env_handler.go b/pkg/raft/rafttest/interaction_env_handler.go index e0c122803918..034f5b05a54b 100644 --- a/pkg/raft/rafttest/interaction_env_handler.go +++ b/pkg/raft/rafttest/interaction_env_handler.go @@ -262,6 +262,17 @@ func (env *InteractionEnv) Handle(t *testing.T, d datadriven.TestData) string { // Explanation: // 1 (from_store) grants support for 2 (for_store) at a higher epoch. err = env.handleGrantSupport(t, d) + case "print-support-state": + // Prints the support state being tracked by a raft leader. Empty on a + // follower. + // + // print-support-state id + // Arguments are: + // id - id of the raft peer whose support map to print. + // + // Example: + // print-support-state 1 + err = env.handlePrintSupportState(t, d) default: err = fmt.Errorf("unknown command") diff --git a/pkg/raft/rafttest/interaction_env_handler_raftstate.go b/pkg/raft/rafttest/interaction_env_handler_raftstate.go index 960eb144ce38..512b433e1fb5 100644 --- a/pkg/raft/rafttest/interaction_env_handler_raftstate.go +++ b/pkg/raft/rafttest/interaction_env_handler_raftstate.go @@ -19,9 +19,11 @@ package rafttest import ( "fmt" + "testing" "github.com/cockroachdb/cockroach/pkg/raft" "github.com/cockroachdb/cockroach/pkg/raft/raftpb" + "github.com/cockroachdb/datadriven" ) // isVoter checks whether node id is in the voter list within st. @@ -51,3 +53,11 @@ func (env *InteractionEnv) handleRaftState() error { } return nil } + +// handlePrintSupportState pretty-prints the support map being tracked by a raft +// peer. +func (env *InteractionEnv) handlePrintSupportState(t *testing.T, d datadriven.TestData) error { + idx := firstAsNodeIdx(t, d) + fmt.Fprint(env.Output, env.Nodes[idx].TestingSupportStateString()) + return nil +} diff --git a/pkg/raft/rawnode.go b/pkg/raft/rawnode.go index 79f021b980ca..8e9b09798a2f 100644 --- a/pkg/raft/rawnode.go +++ b/pkg/raft/rawnode.go @@ -499,9 +499,6 @@ func (rn *RawNode) LeadSupportStatus() LeadSupportStatus { return getLeadSupportStatus(rn.raft) } -// TODO(nvanbenschoten): remove this one the method is used. -var _ = (*RawNode).LeadSupportStatus - // ProgressType indicates the type of replica a Progress corresponds to. type ProgressType byte @@ -544,3 +541,7 @@ func (rn *RawNode) ForgetLeader() error { func (rn *RawNode) TestingStepDown() error { return rn.raft.testingStepDown() } + +func (rn *RawNode) TestingSupportStateString() string { + return rn.raft.supportTracker.String() +} diff --git a/pkg/raft/status.go b/pkg/raft/status.go index 2c96d9bf5f7e..0ec5ffb8ffc3 100644 --- a/pkg/raft/status.go +++ b/pkg/raft/status.go @@ -131,7 +131,7 @@ func getStatus(r *raft) Status { // NOTE: we assign to LeadSupportUntil even if RaftState is not currently // StateLeader. The replica may have been the leader and stepped down to a // follower before its lead support ran out. - s.LeadSupportUntil = hlc.Timestamp{} // TODO(arul): populate this field + s.LeadSupportUntil = r.supportTracker.LeadSupportUntil() return s } @@ -155,7 +155,7 @@ func getSparseStatus(r *raft) SparseStatus { func getLeadSupportStatus(r *raft) LeadSupportStatus { var s LeadSupportStatus s.BasicStatus = getBasicStatus(r) - s.LeadSupportUntil = hlc.Timestamp{} // TODO(arul): populate this field + s.LeadSupportUntil = r.supportTracker.LeadSupportUntil() return s } diff --git a/pkg/raft/testdata/async_storage_writes.txt b/pkg/raft/testdata/async_storage_writes.txt index f75ae8efd131..de45ee670897 100644 --- a/pkg/raft/testdata/async_storage_writes.txt +++ b/pkg/raft/testdata/async_storage_writes.txt @@ -94,6 +94,7 @@ stabilize 1->3 MsgApp Term:1 Log:1/10 Commit:10 Entries:[1/11 EntryNormal ""] 1->AppendThread MsgStorageAppend Term:1 Log:1/11 Commit:10 Vote:1 Lead:1 LeadEpoch:1 Entries:[1/11 EntryNormal ""] Responses:[ 1->1 MsgAppResp Term:1 Log:0/11 Commit:10 + 1->1 MsgFortifyLeaderResp Term:1 Log:0/0 LeadEpoch:1 AppendThread->1 MsgStorageAppendResp Term:0 Log:1/11 ] > 2 receiving messages @@ -107,6 +108,7 @@ stabilize 1->AppendThread MsgStorageAppend Term:1 Log:1/11 Commit:10 Vote:1 Lead:1 LeadEpoch:1 Entries:[1/11 EntryNormal ""] Responses: 1->1 MsgAppResp Term:1 Log:0/11 Commit:10 + 1->1 MsgFortifyLeaderResp Term:1 Log:0/0 LeadEpoch:1 AppendThread->1 MsgStorageAppendResp Term:0 Log:1/11 > 2 handling Ready Ready MustSync=true: @@ -132,6 +134,7 @@ stabilize ] > 1 receiving messages 1->1 MsgAppResp Term:1 Log:0/11 Commit:10 + 1->1 MsgFortifyLeaderResp Term:1 Log:0/0 LeadEpoch:1 AppendThread->1 MsgStorageAppendResp Term:0 Log:1/11 > 2 processing append thread Processing: diff --git a/pkg/raft/testdata/async_storage_writes_append_aba_race.txt b/pkg/raft/testdata/async_storage_writes_append_aba_race.txt index a2212260593d..c1f6e0a6ce79 100644 --- a/pkg/raft/testdata/async_storage_writes_append_aba_race.txt +++ b/pkg/raft/testdata/async_storage_writes_append_aba_race.txt @@ -214,6 +214,7 @@ Messages: 3->7 MsgApp Term:2 Log:1/11 Commit:11 Entries:[2/12 EntryNormal ""] 3->AppendThread MsgStorageAppend Term:2 Log:2/12 Commit:11 Vote:3 Lead:3 LeadEpoch:1 Entries:[2/12 EntryNormal ""] Responses:[ 3->3 MsgAppResp Term:2 Log:0/12 Commit:11 + 3->3 MsgFortifyLeaderResp Term:2 Log:0/0 LeadEpoch:1 AppendThread->3 MsgStorageAppendResp Term:0 Log:2/12 ] @@ -383,6 +384,7 @@ Messages: 4->7 MsgApp Term:3 Log:1/11 Commit:11 Entries:[3/12 EntryNormal ""] 4->AppendThread MsgStorageAppend Term:3 Log:3/12 Commit:11 Vote:4 Lead:4 LeadEpoch:1 Entries:[3/12 EntryNormal ""] Responses:[ 4->4 MsgAppResp Term:3 Log:0/12 Commit:11 + 4->4 MsgFortifyLeaderResp Term:3 Log:0/0 LeadEpoch:1 AppendThread->4 MsgStorageAppendResp Term:0 Log:3/12 ] diff --git a/pkg/raft/testdata/fortification_support_tracking.txt b/pkg/raft/testdata/fortification_support_tracking.txt new file mode 100644 index 000000000000..e6c30992e891 --- /dev/null +++ b/pkg/raft/testdata/fortification_support_tracking.txt @@ -0,0 +1,281 @@ +# Test to ensure that leaders correctly track fortification support. + +log-level debug +---- +ok + +add-nodes 3 voters=(1,2,3) index=10 +---- +INFO 1 switched to configuration voters=(1 2 3) +INFO 1 became follower at term 0 +INFO newRaft 1 [peers: [1,2,3], term: 0, commit: 10, applied: 10, lastindex: 10, lastterm: 1] +INFO 2 switched to configuration voters=(1 2 3) +INFO 2 became follower at term 0 +INFO newRaft 2 [peers: [1,2,3], term: 0, commit: 10, applied: 10, lastindex: 10, lastterm: 1] +INFO 3 switched to configuration voters=(1 2 3) +INFO 3 became follower at term 0 +INFO newRaft 3 [peers: [1,2,3], term: 0, commit: 10, applied: 10, lastindex: 10, lastterm: 1] + +withdraw-support 2 1 +---- + 1 2 3 +1 1 1 1 +2 x 1 1 +3 1 1 1 + +withdraw-support 3 1 +---- + 1 2 3 +1 1 1 1 +2 x 1 1 +3 x 1 1 + +campaign 1 +---- +INFO 1 is starting a new election at term 0 +INFO 1 became candidate at term 1 +INFO 1 [logterm: 1, index: 10] sent MsgVote request to 2 at term 1 +INFO 1 [logterm: 1, index: 10] sent MsgVote request to 3 at term 1 + +stabilize +---- +> 1 handling Ready + Ready MustSync=true: + State:StateCandidate + HardState Term:1 Vote:1 Commit:10 Lead:0 LeadEpoch:0 + Messages: + 1->2 MsgVote Term:1 Log:1/10 + 1->3 MsgVote Term:1 Log:1/10 + INFO 1 received MsgVoteResp from 1 at term 1 + INFO 1 has received 1 MsgVoteResp votes and 0 vote rejections +> 2 receiving messages + 1->2 MsgVote Term:1 Log:1/10 + INFO 2 [term: 0] received a MsgVote message with higher term from 1 [term: 1] + INFO 2 became follower at term 1 + INFO 2 [logterm: 1, index: 10, vote: 0] cast MsgVote for 1 [logterm: 1, index: 10] at term 1 +> 3 receiving messages + 1->3 MsgVote Term:1 Log:1/10 + INFO 3 [term: 0] received a MsgVote message with higher term from 1 [term: 1] + INFO 3 became follower at term 1 + INFO 3 [logterm: 1, index: 10, vote: 0] cast MsgVote for 1 [logterm: 1, index: 10] at term 1 +> 2 handling Ready + Ready MustSync=true: + HardState Term:1 Vote:1 Commit:10 Lead:0 LeadEpoch:0 + Messages: + 2->1 MsgVoteResp Term:1 Log:0/0 +> 3 handling Ready + Ready MustSync=true: + HardState Term:1 Vote:1 Commit:10 Lead:0 LeadEpoch:0 + Messages: + 3->1 MsgVoteResp Term:1 Log:0/0 +> 1 receiving messages + 2->1 MsgVoteResp Term:1 Log:0/0 + INFO 1 received MsgVoteResp from 2 at term 1 + INFO 1 has received 2 MsgVoteResp votes and 0 vote rejections + INFO 1 became leader at term 1 + 3->1 MsgVoteResp Term:1 Log:0/0 +> 1 handling Ready + Ready MustSync=true: + State:StateLeader + HardState Term:1 Vote:1 Commit:10 Lead:1 LeadEpoch:1 + Entries: + 1/11 EntryNormal "" + Messages: + 1->2 MsgFortifyLeader Term:1 Log:0/0 + 1->3 MsgFortifyLeader Term:1 Log:0/0 + 1->2 MsgApp Term:1 Log:1/10 Commit:10 Entries:[1/11 EntryNormal ""] + 1->3 MsgApp Term:1 Log:1/10 Commit:10 Entries:[1/11 EntryNormal ""] +> 2 receiving messages + 1->2 MsgFortifyLeader Term:1 Log:0/0 + 1->2 MsgApp Term:1 Log:1/10 Commit:10 Entries:[1/11 EntryNormal ""] +> 3 receiving messages + 1->3 MsgFortifyLeader Term:1 Log:0/0 + 1->3 MsgApp Term:1 Log:1/10 Commit:10 Entries:[1/11 EntryNormal ""] +> 2 handling Ready + Ready MustSync=true: + HardState Term:1 Vote:1 Commit:10 Lead:1 LeadEpoch:0 + Entries: + 1/11 EntryNormal "" + Messages: + 2->1 MsgFortifyLeaderResp Term:1 Log:0/0 Rejected (Hint: 0) + 2->1 MsgAppResp Term:1 Log:0/11 Commit:10 +> 3 handling Ready + Ready MustSync=true: + HardState Term:1 Vote:1 Commit:10 Lead:1 LeadEpoch:0 + Entries: + 1/11 EntryNormal "" + Messages: + 3->1 MsgFortifyLeaderResp Term:1 Log:0/0 Rejected (Hint: 0) + 3->1 MsgAppResp Term:1 Log:0/11 Commit:10 +> 1 receiving messages + 2->1 MsgFortifyLeaderResp Term:1 Log:0/0 Rejected (Hint: 0) + 2->1 MsgAppResp Term:1 Log:0/11 Commit:10 + 3->1 MsgFortifyLeaderResp Term:1 Log:0/0 Rejected (Hint: 0) + 3->1 MsgAppResp Term:1 Log:0/11 Commit:10 +> 1 handling Ready + Ready MustSync=true: + HardState Term:1 Vote:1 Commit:11 Lead:1 LeadEpoch:1 + CommittedEntries: + 1/11 EntryNormal "" + Messages: + 1->2 MsgApp Term:1 Log:1/11 Commit:11 + 1->3 MsgApp Term:1 Log:1/11 Commit:11 +> 2 receiving messages + 1->2 MsgApp Term:1 Log:1/11 Commit:11 +> 3 receiving messages + 1->3 MsgApp Term:1 Log:1/11 Commit:11 +> 2 handling Ready + Ready MustSync=true: + HardState Term:1 Vote:1 Commit:11 Lead:1 LeadEpoch:0 + CommittedEntries: + 1/11 EntryNormal "" + Messages: + 2->1 MsgAppResp Term:1 Log:0/11 Commit:11 +> 3 handling Ready + Ready MustSync=true: + HardState Term:1 Vote:1 Commit:11 Lead:1 LeadEpoch:0 + CommittedEntries: + 1/11 EntryNormal "" + Messages: + 3->1 MsgAppResp Term:1 Log:0/11 Commit:11 +> 1 receiving messages + 2->1 MsgAppResp Term:1 Log:0/11 Commit:11 + 3->1 MsgAppResp Term:1 Log:0/11 Commit:11 + +print-support-state 1 +---- +1 : 1 + +bump-epoch 2 +---- + 1 2 3 +1 1 2 1 +2 x 2 1 +3 x 2 1 + +withdraw-support 3 2 +---- + 1 2 3 +1 1 2 1 +2 x 2 1 +3 x x 1 + +grant-support 3 2 +---- + 1 2 3 +1 1 2 1 +2 x 3 1 +3 x 3 1 + +campaign 2 +---- +INFO 2 is starting a new election at term 1 +INFO 2 became candidate at term 2 +INFO 2 [logterm: 1, index: 11] sent MsgVote request to 1 at term 2 +INFO 2 [logterm: 1, index: 11] sent MsgVote request to 3 at term 2 + +stabilize +---- +> 2 handling Ready + Ready MustSync=true: + State:StateCandidate + HardState Term:2 Vote:2 Commit:11 Lead:0 LeadEpoch:0 + Messages: + 2->1 MsgVote Term:2 Log:1/11 + 2->3 MsgVote Term:2 Log:1/11 + INFO 2 received MsgVoteResp from 2 at term 2 + INFO 2 has received 1 MsgVoteResp votes and 0 vote rejections +> 1 receiving messages + 2->1 MsgVote Term:2 Log:1/11 + INFO 1 [logterm: 1, index: 11, vote: 1] ignored MsgVote from 2 [logterm: 1, index: 11] at term 1: supporting fortified leader 1 at epoch 1 +> 3 receiving messages + 2->3 MsgVote Term:2 Log:1/11 + INFO 3 [term: 1] received a MsgVote message with higher term from 2 [term: 2] + INFO 3 became follower at term 2 + INFO 3 [logterm: 1, index: 11, vote: 0] cast MsgVote for 2 [logterm: 1, index: 11] at term 2 +> 3 handling Ready + Ready MustSync=true: + HardState Term:2 Vote:2 Commit:11 Lead:0 LeadEpoch:0 + Messages: + 3->2 MsgVoteResp Term:2 Log:0/0 +> 2 receiving messages + 3->2 MsgVoteResp Term:2 Log:0/0 + INFO 2 received MsgVoteResp from 3 at term 2 + INFO 2 has received 2 MsgVoteResp votes and 0 vote rejections + INFO 2 became leader at term 2 +> 2 handling Ready + Ready MustSync=true: + State:StateLeader + HardState Term:2 Vote:2 Commit:11 Lead:2 LeadEpoch:3 + Entries: + 2/12 EntryNormal "" + Messages: + 2->1 MsgFortifyLeader Term:2 Log:0/0 + 2->3 MsgFortifyLeader Term:2 Log:0/0 + 2->1 MsgApp Term:2 Log:1/11 Commit:11 Entries:[2/12 EntryNormal ""] + 2->3 MsgApp Term:2 Log:1/11 Commit:11 Entries:[2/12 EntryNormal ""] +> 1 receiving messages + 2->1 MsgFortifyLeader Term:2 Log:0/0 + INFO 1 [term: 1] received a MsgFortifyLeader message with higher term from 2 [term: 2] + INFO 1 became follower at term 2 + 2->1 MsgApp Term:2 Log:1/11 Commit:11 Entries:[2/12 EntryNormal ""] +> 3 receiving messages + 2->3 MsgFortifyLeader Term:2 Log:0/0 + 2->3 MsgApp Term:2 Log:1/11 Commit:11 Entries:[2/12 EntryNormal ""] +> 1 handling Ready + Ready MustSync=true: + State:StateFollower + HardState Term:2 Commit:11 Lead:2 LeadEpoch:2 + Entries: + 2/12 EntryNormal "" + Messages: + 1->2 MsgFortifyLeaderResp Term:2 Log:0/0 LeadEpoch:2 + 1->2 MsgAppResp Term:2 Log:0/12 Commit:11 +> 3 handling Ready + Ready MustSync=true: + HardState Term:2 Vote:2 Commit:11 Lead:2 LeadEpoch:3 + Entries: + 2/12 EntryNormal "" + Messages: + 3->2 MsgFortifyLeaderResp Term:2 Log:0/0 LeadEpoch:3 + 3->2 MsgAppResp Term:2 Log:0/12 Commit:11 +> 2 receiving messages + 1->2 MsgFortifyLeaderResp Term:2 Log:0/0 LeadEpoch:2 + 1->2 MsgAppResp Term:2 Log:0/12 Commit:11 + 3->2 MsgFortifyLeaderResp Term:2 Log:0/0 LeadEpoch:3 + 3->2 MsgAppResp Term:2 Log:0/12 Commit:11 +> 2 handling Ready + Ready MustSync=true: + HardState Term:2 Vote:2 Commit:12 Lead:2 LeadEpoch:3 + CommittedEntries: + 2/12 EntryNormal "" + Messages: + 2->1 MsgApp Term:2 Log:2/12 Commit:12 + 2->3 MsgApp Term:2 Log:2/12 Commit:12 +> 1 receiving messages + 2->1 MsgApp Term:2 Log:2/12 Commit:12 +> 3 receiving messages + 2->3 MsgApp Term:2 Log:2/12 Commit:12 +> 1 handling Ready + Ready MustSync=true: + HardState Term:2 Commit:12 Lead:2 LeadEpoch:2 + CommittedEntries: + 2/12 EntryNormal "" + Messages: + 1->2 MsgAppResp Term:2 Log:0/12 Commit:12 +> 3 handling Ready + Ready MustSync=true: + HardState Term:2 Vote:2 Commit:12 Lead:2 LeadEpoch:3 + CommittedEntries: + 2/12 EntryNormal "" + Messages: + 3->2 MsgAppResp Term:2 Log:0/12 Commit:12 +> 2 receiving messages + 1->2 MsgAppResp Term:2 Log:0/12 Commit:12 + 3->2 MsgAppResp Term:2 Log:0/12 Commit:12 + +print-support-state 2 +---- +1 : 2 +2 : 3 +3 : 3 diff --git a/pkg/raft/tracker/BUILD.bazel b/pkg/raft/tracker/BUILD.bazel index b82995d048c5..02db000787f7 100644 --- a/pkg/raft/tracker/BUILD.bazel +++ b/pkg/raft/tracker/BUILD.bazel @@ -8,12 +8,15 @@ go_library( "progress.go", "progresstracker.go", "state.go", + "supporttracker.go", ], importpath = "github.com/cockroachdb/cockroach/pkg/raft/tracker", visibility = ["//visibility:public"], deps = [ "//pkg/raft/quorum", "//pkg/raft/raftpb", + "//pkg/raft/raftstoreliveness", + "//pkg/util/hlc", ], ) @@ -22,9 +25,16 @@ go_test( srcs = [ "inflights_test.go", "progress_test.go", + "supporttracker_test.go", ], embed = [":tracker"], deps = [ + "//pkg/raft/quorum", + "//pkg/raft/raftpb", + "//pkg/raft/raftstoreliveness", + "//pkg/util/hlc", + "//pkg/util/leaktest", + "//pkg/util/log", "@com_github_stretchr_testify//assert", "@com_github_stretchr_testify//require", ], diff --git a/pkg/raft/tracker/electiontracker.go b/pkg/raft/tracker/electiontracker.go index f97f823f7ce7..77a28997faf0 100644 --- a/pkg/raft/tracker/electiontracker.go +++ b/pkg/raft/tracker/electiontracker.go @@ -23,7 +23,7 @@ type ElectionTracker struct { votes map[pb.PeerID]bool } -func MakeVoteTracker(config *quorum.Config) ElectionTracker { +func MakeElectionTracker(config *quorum.Config) ElectionTracker { return ElectionTracker{ config: config, votes: map[pb.PeerID]bool{}, diff --git a/pkg/raft/tracker/supporttracker.go b/pkg/raft/tracker/supporttracker.go new file mode 100644 index 000000000000..02b231d7aea4 --- /dev/null +++ b/pkg/raft/tracker/supporttracker.go @@ -0,0 +1,99 @@ +// Copyright 2024 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package tracker + +import ( + "fmt" + "slices" + "strings" + + "github.com/cockroachdb/cockroach/pkg/raft/quorum" + pb "github.com/cockroachdb/cockroach/pkg/raft/raftpb" + "github.com/cockroachdb/cockroach/pkg/raft/raftstoreliveness" + "github.com/cockroachdb/cockroach/pkg/util/hlc" +) + +// SupportTracker is used to track fortification support from peers. This can +// then be used to compute until when a leader's support expires. +type SupportTracker struct { + config *quorum.Config + storeLiveness raftstoreliveness.StoreLiveness + + // support contains a map of nodes which have supported the leader through + // fortification handshakes, and the corresponding Store Liveness epochs that + // they have supported the leader in. + support map[pb.PeerID]pb.Epoch +} + +// MakeSupportTracker initializes a SupportTracker. +func MakeSupportTracker( + config *quorum.Config, storeLiveness raftstoreliveness.StoreLiveness, +) SupportTracker { + st := SupportTracker{ + config: config, + storeLiveness: storeLiveness, + support: map[pb.PeerID]pb.Epoch{}, + } + return st +} + +// RecordSupport records that the node with the given id supported this Raft +// instance until the supplied timestamp. +func (st *SupportTracker) RecordSupport(id pb.PeerID, epoch pb.Epoch) { + // The supported epoch should never regress. Guard against out of order + // delivery of fortify responses by using max. + st.support[id] = max(st.support[id], epoch) +} + +// Reset clears out any previously tracked support. +func (st *SupportTracker) Reset() { + clear(st.support) + // TODO(arul): when we introduce st.LeadSupportUntil we need to make sure it + // isn't reset here, because we don't want it to regress when a leader steps + // down. +} + +// LeadSupportUntil returns the timestamp until which the leader is guaranteed +// support until based on the support being tracked for it by its peers. +func (st *SupportTracker) LeadSupportUntil() hlc.Timestamp { + // TODO(arul): avoid this map allocation as we're calling LeadSupportUntil + // from hot paths. + supportExpMap := make(map[pb.PeerID]hlc.Timestamp) + for id, supportEpoch := range st.support { + curEpoch, curExp, ok := st.storeLiveness.SupportFrom(id) + // NB: We can't assert that supportEpoch <= curEpoch because there may be a + // race between a successful MsgFortifyLeaderResp and the store liveness + // heartbeat response that lets the leader know the follower's store is + // supporting the leader's store at the epoch in the MsgFortifyLeaderResp + // message. + if ok && curEpoch == supportEpoch { + supportExpMap[id] = curExp + } + } + return st.config.Voters.LeadSupportExpiration(supportExpMap) +} + +func (st *SupportTracker) String() string { + if len(st.support) == 0 { + return "empty" + } + // Print the map in sorted order as we assert on its output in tests. + ids := make([]pb.PeerID, 0, len(st.support)) + for id := range st.support { + ids = append(ids, id) + } + slices.Sort(ids) + var buf strings.Builder + for _, id := range ids { + fmt.Fprintf(&buf, "%d : %d\n", id, st.support[id]) + } + return buf.String() +} diff --git a/pkg/raft/tracker/supporttracker_test.go b/pkg/raft/tracker/supporttracker_test.go new file mode 100644 index 000000000000..36f52cea1a33 --- /dev/null +++ b/pkg/raft/tracker/supporttracker_test.go @@ -0,0 +1,187 @@ +// Copyright 2024 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package tracker + +import ( + "testing" + + "github.com/cockroachdb/cockroach/pkg/raft/quorum" + pb "github.com/cockroachdb/cockroach/pkg/raft/raftpb" + "github.com/cockroachdb/cockroach/pkg/raft/raftstoreliveness" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/stretchr/testify/require" +) + +func TestLeadSupportUntil(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ts := func(ts int64) hlc.Timestamp { + return hlc.Timestamp{ + WallTime: ts, + } + } + + mockLiveness3Peers := makeMockStoreLiveness( + map[pb.PeerID]mockLivenessEntry{ + 1: makeMockLivenessEntry(10, ts(10)), + 2: makeMockLivenessEntry(20, ts(15)), + 3: makeMockLivenessEntry(30, ts(20)), + }, + ) + + testCases := []struct { + ids []pb.PeerID + storeLiveness raftstoreliveness.StoreLiveness + setup func(tracker *SupportTracker) + expTS hlc.Timestamp + }{ + { + ids: []pb.PeerID{1, 2, 3}, + storeLiveness: mockLiveness3Peers, + setup: func(supportTracker *SupportTracker) { + // No support recorded. + }, + expTS: hlc.Timestamp{}, + }, + { + ids: []pb.PeerID{1, 2, 3}, + storeLiveness: mockLiveness3Peers, + setup: func(supportTracker *SupportTracker) { + supportTracker.RecordSupport(1, 10) + }, + expTS: hlc.Timestamp{}, + }, + { + ids: []pb.PeerID{1, 2, 3}, + storeLiveness: mockLiveness3Peers, + setup: func(supportTracker *SupportTracker) { + supportTracker.RecordSupport(1, 10) + supportTracker.RecordSupport(3, 30) + }, + expTS: ts(10), + }, + { + ids: []pb.PeerID{1, 2, 3}, + storeLiveness: mockLiveness3Peers, + setup: func(supportTracker *SupportTracker) { + supportTracker.RecordSupport(1, 10) + supportTracker.RecordSupport(3, 30) + supportTracker.RecordSupport(2, 20) + }, + expTS: ts(15), + }, + { + ids: []pb.PeerID{1, 2, 3}, + storeLiveness: mockLiveness3Peers, + setup: func(supportTracker *SupportTracker) { + // Record support at epochs at expired epochs. + supportTracker.RecordSupport(1, 9) + supportTracker.RecordSupport(3, 29) + supportTracker.RecordSupport(2, 19) + }, + expTS: hlc.Timestamp{}, + }, + { + ids: []pb.PeerID{1, 2, 3}, + storeLiveness: mockLiveness3Peers, + setup: func(supportTracker *SupportTracker) { + // Record support at newer epochs than what are present in + // StoreLiveness. + // + // NB: This is possible if there is a race between store liveness + // heartbeats updates and fortification responses. + supportTracker.RecordSupport(1, 11) + supportTracker.RecordSupport(3, 31) + supportTracker.RecordSupport(2, 21) + }, + expTS: hlc.Timestamp{}, + }, + { + ids: []pb.PeerID{1, 2, 3}, + storeLiveness: mockLiveness3Peers, + setup: func(supportTracker *SupportTracker) { + // One of the epochs being supported is expired. + supportTracker.RecordSupport(1, 10) + supportTracker.RecordSupport(3, 29) // expired + supportTracker.RecordSupport(2, 20) + }, + expTS: ts(10), + }, + { + ids: []pb.PeerID{1, 2, 3}, + storeLiveness: mockLiveness3Peers, + setup: func(supportTracker *SupportTracker) { + // Two of the epochs being supported is expired. + supportTracker.RecordSupport(1, 10) + supportTracker.RecordSupport(3, 29) // expired + supportTracker.RecordSupport(2, 19) // expired + }, + expTS: hlc.Timestamp{}, + }, + } + + for _, tc := range testCases { + cfg := quorum.MakeEmptyConfig() + for _, id := range tc.ids { + cfg.Voters[0][id] = struct{}{} + } + supportTracker := MakeSupportTracker(&cfg, tc.storeLiveness) + + tc.setup(&supportTracker) + require.Equal(t, tc.expTS, supportTracker.LeadSupportUntil()) + } +} + +type mockLivenessEntry struct { + epoch pb.Epoch + ts hlc.Timestamp +} + +func makeMockLivenessEntry(epoch pb.Epoch, ts hlc.Timestamp) mockLivenessEntry { + return mockLivenessEntry{ + epoch: epoch, + ts: ts, + } +} + +type mockStoreLiveness struct { + liveness map[pb.PeerID]mockLivenessEntry +} + +func makeMockStoreLiveness(liveness map[pb.PeerID]mockLivenessEntry) mockStoreLiveness { + return mockStoreLiveness{ + liveness: liveness, + } +} + +// SupportFor implements the raftstoreliveness.StoreLiveness interface. +func (mockStoreLiveness) SupportFor(pb.PeerID) (pb.Epoch, bool) { + panic("unimplemented") +} + +// SupportFrom implements the raftstoreliveness.StoreLiveness interface. +func (m mockStoreLiveness) SupportFrom(id pb.PeerID) (pb.Epoch, hlc.Timestamp, bool) { + entry := m.liveness[id] + return entry.epoch, entry.ts, true +} + +// SupportFromEnabled implements the raftstoreliveness.StoreLiveness interface. +func (mockStoreLiveness) SupportFromEnabled() bool { + return true +} + +// SupportExpired implements the raftstoreliveness.StoreLiveness interface. +func (mockStoreLiveness) SupportExpired(hlc.Timestamp) bool { + panic("unimplemented") +}