Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
130688: replica_rac2: move raft write registering r=sumeerbhola a=pav-kv

This commit moves registering all raft log/snapshot writes for RACv2 into `HandleRaftReadyRaftMuLocked`, so that `maybeSendAdmittedRaftMuLocked` call always happens after registering the latest write.

This way, the admitted vector in `logTracker` carries the latest leader term we have observed from raft, and `maybeSendAdmittedRaftMuLocked` won't skip it when the term changes.

Part of cockroachdb#129508

Co-authored-by: Pavel Kalinnikov <[email protected]>
  • Loading branch information
craig[bot] and pav-kv committed Sep 16, 2024
2 parents a50d588 + e45ae1a commit d33d8b6
Show file tree
Hide file tree
Showing 8 changed files with 100 additions and 57 deletions.
41 changes: 22 additions & 19 deletions pkg/kv/kvserver/kvflowcontrol/rac2/log_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,28 @@ func (l *LogTracker) Admitted() AdmittedVector {
return a
}

// Snap informs the tracker that a snapshot at the given log mark is about to be
// sent to storage, and the log will be truncated.
//
// Returns true if the admitted vector has changed. The only way it can change
// is when the leader term goes up, and indices potentially regress after the
// log is truncated by this newer term write.
func (l *LogTracker) Snap(ctx context.Context, mark LogMark) bool {
if !mark.After(l.last) {
l.errorf(ctx, "registering stale snapshot %+v", mark)
return false
}
// Fake an append spanning the gap between the log and the snapshot. It will,
// if necessary, truncate the stable index and remove entries waiting for
// admission that became obsolete.
//
// Some entries waiting in the queue remain such, even though not all of them
// might be actually in the pre-image of the snapshot. We chose this in order
// to retain some backpressure, all these entries were written anyway and wait
// for storage admission.
return l.Append(ctx, min(l.last.Index, mark.Index), mark)
}

// Append informs the tracker that log entries at indices (after, to.Index] are
// about to be sent to stable storage, on behalf of the to.Term leader.
//
Expand Down Expand Up @@ -256,25 +278,6 @@ func (l *LogTracker) LogAdmitted(ctx context.Context, at LogMark, pri raftpb.Pri
return updated
}

// SnapSynced informs the tracker that a snapshot at the given log mark has been
// stored/synced, and the log is cleared.
//
// Returns true if the admitted vector has changed.
func (l *LogTracker) SnapSynced(ctx context.Context, mark LogMark) bool {
if !mark.After(l.last) {
l.errorf(ctx, "syncing stale snapshot %+v", mark)
return false
}
// Fake an append spanning the gap between the log and the snapshot. It will,
// if necessary, truncate the stable index and remove entries waiting for
// admission that became obsolete.
updated := l.Append(ctx, min(l.last.Index, mark.Index), mark)
if l.LogSynced(ctx, mark) {
return true
}
return updated
}

// String returns a string representation of the LogTracker.
func (l *LogTracker) String() string {
return redact.StringWithoutMarkers(l)
Expand Down
18 changes: 12 additions & 6 deletions pkg/kv/kvserver/kvflowcontrol/rac2/log_tracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,23 +156,24 @@ func TestLogTrackerLogSynced(t *testing.T) {
}
}

func TestLogTrackerSnapSynced(t *testing.T) {
func TestLogTrackerSnap(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

for _, tt := range []struct {
last LogMark
stable uint64
snap LogMark
want LogMark // stable mark after registering the snap
notOk bool
}{
// Invalid snapshots.
{last: mark(5, 20), snap: mark(4, 30), notOk: true},
{last: mark(5, 20), snap: mark(5, 10), notOk: true},
// Valid snapshots.
{last: mark(5, 20), snap: mark(5, 30)},
{last: mark(5, 20), snap: mark(6, 10)},
{last: mark(5, 20), snap: mark(6, 30)},
{last: mark(5, 20), stable: 20, snap: mark(5, 30), want: mark(5, 20)},
{last: mark(5, 20), stable: 15, snap: mark(6, 10), want: mark(6, 10)},
{last: mark(5, 20), stable: 20, snap: mark(6, 30), want: mark(6, 20)},
} {
t.Run("", func(t *testing.T) {
defer func() {
Expand All @@ -182,10 +183,10 @@ func TestLogTrackerSnapSynced(t *testing.T) {
l := NewLogTracker(tt.last)
l.stable = tt.stable
l.check(t)
l.SnapSynced(context.Background(), tt.snap)
l.Snap(context.Background(), tt.snap)
l.check(t)
require.Equal(t, tt.snap, l.last)
require.Equal(t, tt.snap.Index, l.Stable().Index)
require.Equal(t, tt.want, l.Stable())
})
}
}
Expand Down Expand Up @@ -240,6 +241,11 @@ func TestLogTracker(t *testing.T) {
updated := tracker.Append(ctx, after, to)
return state(updated)

case "snap":
mark := readMark(t, d, "index")
updated := tracker.Snap(ctx, mark)
return state(updated)

case "sync": // Example: sync term=10 index=100
mark := readMark(t, d, "index")
updated := tracker.LogSynced(ctx, mark)
Expand Down
29 changes: 29 additions & 0 deletions pkg/kv/kvserver/kvflowcontrol/rac2/testdata/log_tracker
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,35 @@ append term=2 after=5 to=10
----
[upd] mark:{Term:2 Index:10}, stable:5, admitted:[5 5 5 5]

register term=2 index=6 pri=LowPri
----
mark:{Term:2 Index:10}, stable:5, admitted:[5 5 5 5]
LowPri: {Term:2 Index:6}

# A snapshot at the same term does not admit all the previous entries, and
# retains the stable index. The index will move when the snapshot is synced.
snap term=2 index=20
----
mark:{Term:2 Index:20}, stable:5, admitted:[5 5 5 5]
LowPri: {Term:2 Index:6}

# Same does a snapshot at a new term and index higher than previous marks. It
# only updates the term of the marks.
snap term=3 index=10
----
[upd] mark:{Term:3 Index:10}, stable:5, admitted:[5 5 5 5]
LowPri: {Term:2 Index:6}

# A snapshot at a low index regresses the stable index, and admitted state and
# queue.
snap term=4 index=3
----
[upd] mark:{Term:4 Index:3}, stable:3, admitted:[3 3 3 3]

sync term=4 index=3
----
mark:{Term:4 Index:3}, stable:3, admitted:[3 3 3 3]

# ------------------------------------------------------------------------------
# Test stable index advancement racing with admission.

Expand Down
16 changes: 8 additions & 8 deletions pkg/kv/kvserver/kvflowcontrol/replica_rac2/log_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,14 @@ func (l *logTracker) admitted(sched bool) (av rac2.AdmittedVector, dirty bool) {
return av, dirty
}

func (l *logTracker) snap(ctx context.Context, mark rac2.LogMark) {
l.Lock()
defer l.Unlock()
if l.lt.Snap(ctx, mark) {
l.dirty = true
}
}

func (l *logTracker) append(ctx context.Context, after uint64, to rac2.LogMark) {
l.Lock()
defer l.Unlock()
Expand Down Expand Up @@ -106,14 +114,6 @@ func (l *logTracker) logAdmitted(ctx context.Context, at rac2.LogMark, pri raftp
return false
}

func (l *logTracker) snapSynced(ctx context.Context, mark rac2.LogMark) {
l.Lock()
defer l.Unlock()
if l.lt.SnapSynced(ctx, mark) {
l.dirty = true
}
}

func (l *logTracker) debugString() string {
l.Lock()
defer l.Unlock()
Expand Down
41 changes: 23 additions & 18 deletions pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ type Processor interface {
// snapshot or log entries batch. It can be called synchronously from
// OnLogSync or OnSnapSync handlers if the write batch is blocking, or
// asynchronously from OnLogSync.
SyncedLogStorage(ctx context.Context, mark rac2.LogMark, snap bool)
SyncedLogStorage(ctx context.Context, mark rac2.LogMark)
// AdmittedLogEntry is called when an entry is admitted. It can be called
// synchronously from within ACWorkQueue.Admit if admission is immediate.
AdmittedLogEntry(
Expand Down Expand Up @@ -758,6 +758,11 @@ func (p *processorImpl) createLeaderStateRaftMuLocked(
// HandleRaftReadyRaftMuLocked implements Processor.
func (p *processorImpl) HandleRaftReadyRaftMuLocked(ctx context.Context, e rac2.RaftEvent) {
p.opts.Replica.RaftMuAssertHeld()
// Register all snapshots / log appends without exception. If the replica is
// being destroyed, this should be a no-op, but there is no harm in
// registering the write just in case.
p.registerStorageAppendRaftMuLocked(ctx, e)

// Skip if the replica is not initialized or already destroyed.
if p.desc.replicas == nil || p.destroyed {
return
Expand Down Expand Up @@ -791,6 +796,8 @@ func (p *processorImpl) HandleRaftReadyRaftMuLocked(ctx context.Context, e rac2.
if !p.isLeaderUsingV2ProcLocked() {
return
}
// NB: since we've registered the latest log/snapshot write (if any) above,
// our admitted vector is likely consistent with the latest leader term.
p.maybeSendAdmittedRaftMuLocked(ctx)
if rc := p.leader.rc; rc != nil {
if err := rc.HandleRaftEventRaftMuLocked(ctx, e); err != nil {
Expand Down Expand Up @@ -854,22 +861,24 @@ func (p *processorImpl) maybeSendAdmittedRaftMuLocked(ctx context.Context) {
}})
}

func (p *processorImpl) registerLogAppend(ctx context.Context, e rac2.RaftEvent) {
if len(e.Entries) == 0 {
return
// registerStorageAppendRaftMuLocked registers the raft storage write with the
// logTracker. All raft writes must be seen by this function.
func (p *processorImpl) registerStorageAppendRaftMuLocked(ctx context.Context, e rac2.RaftEvent) {
// NB: snapshot must be handled first. If Ready contains both snapshot and
// entries, the entries are contiguous with the snapshot.
if snap := e.Snap; snap != nil {
mark := rac2.LogMark{Term: e.Term, Index: snap.Metadata.Index}
p.logTracker.snap(ctx, mark)
}
if len(e.Entries) != 0 {
after := e.Entries[0].Index - 1
to := rac2.LogMark{Term: e.Term, Index: e.Entries[len(e.Entries)-1].Index}
p.logTracker.append(ctx, after, to)
}
after := e.Entries[0].Index - 1
to := rac2.LogMark{Term: e.Term, Index: e.Entries[len(e.Entries)-1].Index}
p.logTracker.append(ctx, after, to)
}

// AdmitRaftEntriesRaftMuLocked implements Processor.
func (p *processorImpl) AdmitRaftEntriesRaftMuLocked(ctx context.Context, e rac2.RaftEvent) bool {
// Register all log appends without exception. If the replica is being
// destroyed, this should be a no-op, but there is no harm in registering the
// write just in case.
p.registerLogAppend(ctx, e)

// Return false only if we're not destroyed and not using V2.
if p.destroyed || !p.isLeaderUsingV2ProcLocked() {
return p.destroyed
Expand Down Expand Up @@ -1013,14 +1022,10 @@ func (p *processorImpl) SideChannelForPriorityOverrideAtFollowerRaftMuLocked(
}

// SyncedLogStorage implements Processor.
func (p *processorImpl) SyncedLogStorage(ctx context.Context, mark rac2.LogMark, snap bool) {
if snap {
p.logTracker.snapSynced(ctx, mark)
} else {
p.logTracker.logSynced(ctx, mark)
}
func (p *processorImpl) SyncedLogStorage(ctx context.Context, mark rac2.LogMark) {
// NB: storage syncs will trigger raft Ready processing, so we don't need to
// explicitly schedule it here like in AdmittedLogEntry.
p.logTracker.logSynced(ctx, mark)
}

// AdmittedLogEntry implements Processor.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,7 @@ func TestProcessorBasic(t *testing.T) {
var mark rac2.LogMark
d.ScanArgs(t, "term", &mark.Term)
d.ScanArgs(t, "index", &mark.Index)
p.SyncedLogStorage(ctx, mark, false /* snap */)
p.SyncedLogStorage(ctx, mark)
printLogTracker()
return builderStr()

Expand Down
6 changes: 3 additions & 3 deletions pkg/kv/kvserver/kvflowcontrol/replica_rac2/testdata/processor
Original file line number Diff line number Diff line change
Expand Up @@ -407,12 +407,13 @@ HandleRaftReady:
RaftNode.TermLocked() = 52
Replica.MuUnlock
RangeControllerFactory.New(replicaSet=[(n1,s2):5,(n11,s11):11], leaseholder=10, nextRaftIndex=28)
RangeController.AdmitRaftMuLocked(5, {Term:52 Admitted:[26 26 26 26]})
RangeController.HandleRaftEventRaftMuLocked([28])
.....
AdmitRaftEntries:
ACWorkQueue.Admit({StoreID:2 TenantID:4 Priority:low-pri CreateTime:2 RequestedCount:100 Ingested:false RangeID:3 ReplicaID:5 CallbackState:{Mark:{Term:52 Index:28} Priority:LowPri}}) = true
destroyed-or-leader-using-v2: true
LogTracker [+dirty]: mark:{Term:52 Index:28}, stable:26, admitted:[26 26 26 26]
LogTracker: mark:{Term:52 Index:28}, stable:26, admitted:[26 26 26 26]
LowPri: {Term:52 Index:28}

# AdmitForEval returns true since there is a RangeController which admitted.
Expand Down Expand Up @@ -462,7 +463,7 @@ admitted: false err: rc-was-closed
# vector is not changed since the last time.
admitted-log-entry leader-term=52 index=28 pri=0
----
LogTracker [+dirty]: mark:{Term:52 Index:28}, stable:26, admitted:[26 26 26 26]
LogTracker: mark:{Term:52 Index:28}, stable:26, admitted:[26 26 26 26]

# Noop.
handle-raft-ready-and-admit
Expand All @@ -475,7 +476,6 @@ HandleRaftReady:
Replica.LeaseholderMuLocked
RaftNode.TermLocked() = 52
Replica.MuUnlock
RangeController.AdmitRaftMuLocked(5, {Term:52 Admitted:[26 26 26 26]})
RangeController.HandleRaftEventRaftMuLocked([])
.....

Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/replica_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -1676,7 +1676,7 @@ func (r *replicaSyncCallback) OnLogSync(
// The log mark is non-empty only if this was a non-empty log append that
// updated the stable log mark.
if mark := done.Mark(); mark.Term != 0 {
repl.flowControlV2.SyncedLogStorage(ctx, mark, false /* snap */)
repl.flowControlV2.SyncedLogStorage(ctx, mark)
}
// Block sending the responses back to raft, if a test needs to.
if fn := repl.store.TestingKnobs().TestingAfterRaftLogSync; fn != nil {
Expand All @@ -1692,7 +1692,7 @@ func (r *replicaSyncCallback) OnLogSync(
func (r *replicaSyncCallback) OnSnapSync(ctx context.Context, done logstore.MsgStorageAppendDone) {
repl := (*Replica)(r)
// NB: when storing snapshot, done always contains a non-zero log mark.
repl.flowControlV2.SyncedLogStorage(ctx, done.Mark(), true /* snap */)
repl.flowControlV2.SyncedLogStorage(ctx, done.Mark())
repl.sendRaftMessages(ctx, done.Responses(), nil /* blocked */, true /* willDeliverLocal */)
}

Expand Down

0 comments on commit d33d8b6

Please sign in to comment.