diff --git a/pkg/kv/kvserver/kvflowcontrol/rac2/log_tracker.go b/pkg/kv/kvserver/kvflowcontrol/rac2/log_tracker.go index b37c70f0cb64..8aecf48303be 100644 --- a/pkg/kv/kvserver/kvflowcontrol/rac2/log_tracker.go +++ b/pkg/kv/kvserver/kvflowcontrol/rac2/log_tracker.go @@ -114,6 +114,28 @@ func (l *LogTracker) Admitted() AdmittedVector { return a } +// Snap informs the tracker that a snapshot at the given log mark is about to be +// sent to storage, and the log will be truncated. +// +// Returns true if the admitted vector has changed. The only way it can change +// is when the leader term goes up, and indices potentially regress after the +// log is truncated by this newer term write. +func (l *LogTracker) Snap(ctx context.Context, mark LogMark) bool { + if !mark.After(l.last) { + l.errorf(ctx, "registering stale snapshot %+v", mark) + return false + } + // Fake an append spanning the gap between the log and the snapshot. It will, + // if necessary, truncate the stable index and remove entries waiting for + // admission that became obsolete. + // + // Some entries waiting in the queue remain such, even though not all of them + // might be actually in the pre-image of the snapshot. We chose this in order + // to retain some backpressure, all these entries were written anyway and wait + // for storage admission. + return l.Append(ctx, min(l.last.Index, mark.Index), mark) +} + // Append informs the tracker that log entries at indices (after, to.Index] are // about to be sent to stable storage, on behalf of the to.Term leader. // @@ -256,25 +278,6 @@ func (l *LogTracker) LogAdmitted(ctx context.Context, at LogMark, pri raftpb.Pri return updated } -// SnapSynced informs the tracker that a snapshot at the given log mark has been -// stored/synced, and the log is cleared. -// -// Returns true if the admitted vector has changed. -func (l *LogTracker) SnapSynced(ctx context.Context, mark LogMark) bool { - if !mark.After(l.last) { - l.errorf(ctx, "syncing stale snapshot %+v", mark) - return false - } - // Fake an append spanning the gap between the log and the snapshot. It will, - // if necessary, truncate the stable index and remove entries waiting for - // admission that became obsolete. - updated := l.Append(ctx, min(l.last.Index, mark.Index), mark) - if l.LogSynced(ctx, mark) { - return true - } - return updated -} - // String returns a string representation of the LogTracker. func (l *LogTracker) String() string { return redact.StringWithoutMarkers(l) diff --git a/pkg/kv/kvserver/kvflowcontrol/rac2/log_tracker_test.go b/pkg/kv/kvserver/kvflowcontrol/rac2/log_tracker_test.go index 5c521cec2c60..c7d6c4f1154a 100644 --- a/pkg/kv/kvserver/kvflowcontrol/rac2/log_tracker_test.go +++ b/pkg/kv/kvserver/kvflowcontrol/rac2/log_tracker_test.go @@ -156,7 +156,7 @@ func TestLogTrackerLogSynced(t *testing.T) { } } -func TestLogTrackerSnapSynced(t *testing.T) { +func TestLogTrackerSnap(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -164,15 +164,16 @@ func TestLogTrackerSnapSynced(t *testing.T) { last LogMark stable uint64 snap LogMark + want LogMark // stable mark after registering the snap notOk bool }{ // Invalid snapshots. {last: mark(5, 20), snap: mark(4, 30), notOk: true}, {last: mark(5, 20), snap: mark(5, 10), notOk: true}, // Valid snapshots. - {last: mark(5, 20), snap: mark(5, 30)}, - {last: mark(5, 20), snap: mark(6, 10)}, - {last: mark(5, 20), snap: mark(6, 30)}, + {last: mark(5, 20), stable: 20, snap: mark(5, 30), want: mark(5, 20)}, + {last: mark(5, 20), stable: 15, snap: mark(6, 10), want: mark(6, 10)}, + {last: mark(5, 20), stable: 20, snap: mark(6, 30), want: mark(6, 20)}, } { t.Run("", func(t *testing.T) { defer func() { @@ -182,10 +183,10 @@ func TestLogTrackerSnapSynced(t *testing.T) { l := NewLogTracker(tt.last) l.stable = tt.stable l.check(t) - l.SnapSynced(context.Background(), tt.snap) + l.Snap(context.Background(), tt.snap) l.check(t) require.Equal(t, tt.snap, l.last) - require.Equal(t, tt.snap.Index, l.Stable().Index) + require.Equal(t, tt.want, l.Stable()) }) } } @@ -240,6 +241,11 @@ func TestLogTracker(t *testing.T) { updated := tracker.Append(ctx, after, to) return state(updated) + case "snap": + mark := readMark(t, d, "index") + updated := tracker.Snap(ctx, mark) + return state(updated) + case "sync": // Example: sync term=10 index=100 mark := readMark(t, d, "index") updated := tracker.LogSynced(ctx, mark) diff --git a/pkg/kv/kvserver/kvflowcontrol/rac2/testdata/log_tracker b/pkg/kv/kvserver/kvflowcontrol/rac2/testdata/log_tracker index fdf07a92e695..7c27d7b680f6 100644 --- a/pkg/kv/kvserver/kvflowcontrol/rac2/testdata/log_tracker +++ b/pkg/kv/kvserver/kvflowcontrol/rac2/testdata/log_tracker @@ -47,6 +47,35 @@ append term=2 after=5 to=10 ---- [upd] mark:{Term:2 Index:10}, stable:5, admitted:[5 5 5 5] +register term=2 index=6 pri=LowPri +---- +mark:{Term:2 Index:10}, stable:5, admitted:[5 5 5 5] +LowPri: {Term:2 Index:6} + +# A snapshot at the same term does not admit all the previous entries, and +# retains the stable index. The index will move when the snapshot is synced. +snap term=2 index=20 +---- +mark:{Term:2 Index:20}, stable:5, admitted:[5 5 5 5] +LowPri: {Term:2 Index:6} + +# Same does a snapshot at a new term and index higher than previous marks. It +# only updates the term of the marks. +snap term=3 index=10 +---- +[upd] mark:{Term:3 Index:10}, stable:5, admitted:[5 5 5 5] +LowPri: {Term:2 Index:6} + +# A snapshot at a low index regresses the stable index, and admitted state and +# queue. +snap term=4 index=3 +---- +[upd] mark:{Term:4 Index:3}, stable:3, admitted:[3 3 3 3] + +sync term=4 index=3 +---- +mark:{Term:4 Index:3}, stable:3, admitted:[3 3 3 3] + # ------------------------------------------------------------------------------ # Test stable index advancement racing with admission. diff --git a/pkg/kv/kvserver/kvflowcontrol/replica_rac2/log_tracker.go b/pkg/kv/kvserver/kvflowcontrol/replica_rac2/log_tracker.go index 68ac7cea2d74..20a378a1cb34 100644 --- a/pkg/kv/kvserver/kvflowcontrol/replica_rac2/log_tracker.go +++ b/pkg/kv/kvserver/kvflowcontrol/replica_rac2/log_tracker.go @@ -63,6 +63,14 @@ func (l *logTracker) admitted(sched bool) (av rac2.AdmittedVector, dirty bool) { return av, dirty } +func (l *logTracker) snap(ctx context.Context, mark rac2.LogMark) { + l.Lock() + defer l.Unlock() + if l.lt.Snap(ctx, mark) { + l.dirty = true + } +} + func (l *logTracker) append(ctx context.Context, after uint64, to rac2.LogMark) { l.Lock() defer l.Unlock() @@ -106,14 +114,6 @@ func (l *logTracker) logAdmitted(ctx context.Context, at rac2.LogMark, pri raftp return false } -func (l *logTracker) snapSynced(ctx context.Context, mark rac2.LogMark) { - l.Lock() - defer l.Unlock() - if l.lt.SnapSynced(ctx, mark) { - l.dirty = true - } -} - func (l *logTracker) debugString() string { l.Lock() defer l.Unlock() diff --git a/pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor.go b/pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor.go index abf1a42bc96d..caac185bd2dc 100644 --- a/pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor.go +++ b/pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor.go @@ -356,7 +356,7 @@ type Processor interface { // snapshot or log entries batch. It can be called synchronously from // OnLogSync or OnSnapSync handlers if the write batch is blocking, or // asynchronously from OnLogSync. - SyncedLogStorage(ctx context.Context, mark rac2.LogMark, snap bool) + SyncedLogStorage(ctx context.Context, mark rac2.LogMark) // AdmittedLogEntry is called when an entry is admitted. It can be called // synchronously from within ACWorkQueue.Admit if admission is immediate. AdmittedLogEntry( @@ -758,6 +758,11 @@ func (p *processorImpl) createLeaderStateRaftMuLocked( // HandleRaftReadyRaftMuLocked implements Processor. func (p *processorImpl) HandleRaftReadyRaftMuLocked(ctx context.Context, e rac2.RaftEvent) { p.opts.Replica.RaftMuAssertHeld() + // Register all snapshots / log appends without exception. If the replica is + // being destroyed, this should be a no-op, but there is no harm in + // registering the write just in case. + p.registerStorageAppendRaftMuLocked(ctx, e) + // Skip if the replica is not initialized or already destroyed. if p.desc.replicas == nil || p.destroyed { return @@ -791,6 +796,8 @@ func (p *processorImpl) HandleRaftReadyRaftMuLocked(ctx context.Context, e rac2. if !p.isLeaderUsingV2ProcLocked() { return } + // NB: since we've registered the latest log/snapshot write (if any) above, + // our admitted vector is likely consistent with the latest leader term. p.maybeSendAdmittedRaftMuLocked(ctx) if rc := p.leader.rc; rc != nil { if err := rc.HandleRaftEventRaftMuLocked(ctx, e); err != nil { @@ -854,22 +861,24 @@ func (p *processorImpl) maybeSendAdmittedRaftMuLocked(ctx context.Context) { }}) } -func (p *processorImpl) registerLogAppend(ctx context.Context, e rac2.RaftEvent) { - if len(e.Entries) == 0 { - return +// registerStorageAppendRaftMuLocked registers the raft storage write with the +// logTracker. All raft writes must be seen by this function. +func (p *processorImpl) registerStorageAppendRaftMuLocked(ctx context.Context, e rac2.RaftEvent) { + // NB: snapshot must be handled first. If Ready contains both snapshot and + // entries, the entries are contiguous with the snapshot. + if snap := e.Snap; snap != nil { + mark := rac2.LogMark{Term: e.Term, Index: snap.Metadata.Index} + p.logTracker.snap(ctx, mark) + } + if len(e.Entries) != 0 { + after := e.Entries[0].Index - 1 + to := rac2.LogMark{Term: e.Term, Index: e.Entries[len(e.Entries)-1].Index} + p.logTracker.append(ctx, after, to) } - after := e.Entries[0].Index - 1 - to := rac2.LogMark{Term: e.Term, Index: e.Entries[len(e.Entries)-1].Index} - p.logTracker.append(ctx, after, to) } // AdmitRaftEntriesRaftMuLocked implements Processor. func (p *processorImpl) AdmitRaftEntriesRaftMuLocked(ctx context.Context, e rac2.RaftEvent) bool { - // Register all log appends without exception. If the replica is being - // destroyed, this should be a no-op, but there is no harm in registering the - // write just in case. - p.registerLogAppend(ctx, e) - // Return false only if we're not destroyed and not using V2. if p.destroyed || !p.isLeaderUsingV2ProcLocked() { return p.destroyed @@ -1013,14 +1022,10 @@ func (p *processorImpl) SideChannelForPriorityOverrideAtFollowerRaftMuLocked( } // SyncedLogStorage implements Processor. -func (p *processorImpl) SyncedLogStorage(ctx context.Context, mark rac2.LogMark, snap bool) { - if snap { - p.logTracker.snapSynced(ctx, mark) - } else { - p.logTracker.logSynced(ctx, mark) - } +func (p *processorImpl) SyncedLogStorage(ctx context.Context, mark rac2.LogMark) { // NB: storage syncs will trigger raft Ready processing, so we don't need to // explicitly schedule it here like in AdmittedLogEntry. + p.logTracker.logSynced(ctx, mark) } // AdmittedLogEntry implements Processor. diff --git a/pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor_test.go b/pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor_test.go index 6769bffb121a..f317f63864a3 100644 --- a/pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor_test.go +++ b/pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor_test.go @@ -366,7 +366,7 @@ func TestProcessorBasic(t *testing.T) { var mark rac2.LogMark d.ScanArgs(t, "term", &mark.Term) d.ScanArgs(t, "index", &mark.Index) - p.SyncedLogStorage(ctx, mark, false /* snap */) + p.SyncedLogStorage(ctx, mark) printLogTracker() return builderStr() diff --git a/pkg/kv/kvserver/kvflowcontrol/replica_rac2/testdata/processor b/pkg/kv/kvserver/kvflowcontrol/replica_rac2/testdata/processor index 33b27e6166d8..a3ddd4b3f070 100644 --- a/pkg/kv/kvserver/kvflowcontrol/replica_rac2/testdata/processor +++ b/pkg/kv/kvserver/kvflowcontrol/replica_rac2/testdata/processor @@ -407,12 +407,13 @@ HandleRaftReady: RaftNode.TermLocked() = 52 Replica.MuUnlock RangeControllerFactory.New(replicaSet=[(n1,s2):5,(n11,s11):11], leaseholder=10, nextRaftIndex=28) + RangeController.AdmitRaftMuLocked(5, {Term:52 Admitted:[26 26 26 26]}) RangeController.HandleRaftEventRaftMuLocked([28]) ..... AdmitRaftEntries: ACWorkQueue.Admit({StoreID:2 TenantID:4 Priority:low-pri CreateTime:2 RequestedCount:100 Ingested:false RangeID:3 ReplicaID:5 CallbackState:{Mark:{Term:52 Index:28} Priority:LowPri}}) = true destroyed-or-leader-using-v2: true -LogTracker [+dirty]: mark:{Term:52 Index:28}, stable:26, admitted:[26 26 26 26] +LogTracker: mark:{Term:52 Index:28}, stable:26, admitted:[26 26 26 26] LowPri: {Term:52 Index:28} # AdmitForEval returns true since there is a RangeController which admitted. @@ -462,7 +463,7 @@ admitted: false err: rc-was-closed # vector is not changed since the last time. admitted-log-entry leader-term=52 index=28 pri=0 ---- -LogTracker [+dirty]: mark:{Term:52 Index:28}, stable:26, admitted:[26 26 26 26] +LogTracker: mark:{Term:52 Index:28}, stable:26, admitted:[26 26 26 26] # Noop. handle-raft-ready-and-admit @@ -475,7 +476,6 @@ HandleRaftReady: Replica.LeaseholderMuLocked RaftNode.TermLocked() = 52 Replica.MuUnlock - RangeController.AdmitRaftMuLocked(5, {Term:52 Admitted:[26 26 26 26]}) RangeController.HandleRaftEventRaftMuLocked([]) ..... diff --git a/pkg/kv/kvserver/replica_raft.go b/pkg/kv/kvserver/replica_raft.go index 6a9f33927923..016fe7de7e3a 100644 --- a/pkg/kv/kvserver/replica_raft.go +++ b/pkg/kv/kvserver/replica_raft.go @@ -1676,7 +1676,7 @@ func (r *replicaSyncCallback) OnLogSync( // The log mark is non-empty only if this was a non-empty log append that // updated the stable log mark. if mark := done.Mark(); mark.Term != 0 { - repl.flowControlV2.SyncedLogStorage(ctx, mark, false /* snap */) + repl.flowControlV2.SyncedLogStorage(ctx, mark) } // Block sending the responses back to raft, if a test needs to. if fn := repl.store.TestingKnobs().TestingAfterRaftLogSync; fn != nil { @@ -1692,7 +1692,7 @@ func (r *replicaSyncCallback) OnLogSync( func (r *replicaSyncCallback) OnSnapSync(ctx context.Context, done logstore.MsgStorageAppendDone) { repl := (*Replica)(r) // NB: when storing snapshot, done always contains a non-zero log mark. - repl.flowControlV2.SyncedLogStorage(ctx, done.Mark(), true /* snap */) + repl.flowControlV2.SyncedLogStorage(ctx, done.Mark()) repl.sendRaftMessages(ctx, done.Responses(), nil /* blocked */, true /* willDeliverLocal */) }