From 60ecf7bf45d114bb142f1e38ce973e697753030d Mon Sep 17 00:00:00 2001 From: Pavel Kalinnikov Date: Wed, 4 Sep 2024 10:41:10 +0100 Subject: [PATCH] kvserver: add leader-side TODOs for admitted vectors Epic: none Release note: none --- pkg/kv/kvserver/raft_transport.go | 20 ++++++++++---------- pkg/kv/kvserver/replica_raft.go | 5 +++++ 2 files changed, 15 insertions(+), 10 deletions(-) diff --git a/pkg/kv/kvserver/raft_transport.go b/pkg/kv/kvserver/raft_transport.go index 9826b7f89b46..5155aa8f8152 100644 --- a/pkg/kv/kvserver/raft_transport.go +++ b/pkg/kv/kvserver/raft_transport.go @@ -453,16 +453,6 @@ func (t *RaftTransport) handleRaftRequest( log.Infof(ctx, "informed of below-raft %s", admittedEntries) } } - if len(req.AdmittedResponse) > 0 { - // NB: we do this via this special path instead of using the - // incomingMessageHandler since we don't have a full-fledged - // RaftMessageRequest for each range (each of these responses could be for - // a different range), and because what we need to do wrt queueing is much - // simpler (we don't need to worry about queue size since we only keep the - // latest message from each replica). - t.kvflowcontrol2.piggybackedResponseScheduler.ScheduleAdmittedResponseForRangeRACv2( - ctx, req.AdmittedResponse) - } if req.ToReplica.StoreID == roachpb.StoreID(0) && len(req.AdmittedRaftLogEntries) > 0 { // The fallback token dispatch mechanism does not specify a destination // replica, and as such, there's no handler for it. We don't want to @@ -543,6 +533,16 @@ func (t *RaftTransport) RaftMessageBatch(stream MultiRaft_RaftMessageBatchServer t.kvflowControl.mu.Lock() t.kvflowControl.mu.connectionTracker.markStoresConnected(storeIDs) t.kvflowControl.mu.Unlock() + if len(batch.AdmittedStates) != 0 { + // TODO(pav-kv): dispatch admitted vectors to RACv2. + // NB: we do this via this special path instead of using the + // handleRaftRequest path since we don't have a full-fledged + // RaftMessageRequest for each range (each of these responses could + // be for a different range), and because what we need to do w.r.t. + // queueing is much simpler (we don't need to worry about queue size + // since we only keep the latest message from each replica). + _ = t.kvflowcontrol2.piggybackedResponseScheduler.ScheduleAdmittedResponseForRangeRACv2 + } if len(batch.Requests) == 0 { continue } diff --git a/pkg/kv/kvserver/replica_raft.go b/pkg/kv/kvserver/replica_raft.go index c0551cb5d0cf..2e71729f6ff4 100644 --- a/pkg/kv/kvserver/replica_raft.go +++ b/pkg/kv/kvserver/replica_raft.go @@ -651,6 +651,11 @@ func (r *Replica) stepRaftGroupRaftMuLocked(req *kvserverpb.RaftMessageRequest) LowPriOverride: req.LowPriorityOverride, } } + case raftpb.MsgAppResp: + if req.AdmittedState.Term != 0 { + // TODO(pav-kv): dispatch admitted vector to RACv2 if one is attached. + _ = 0 + } } err := raftGroup.Step(req.Message) if errors.Is(err, raft.ErrProposalDropped) {