From a58e2a90f4df0839f5fe44b049422e201c3e4884 Mon Sep 17 00:00:00 2001 From: Andrew Baptist Date: Fri, 20 Sep 2024 10:21:59 -0400 Subject: [PATCH] raft: add tracing to important raft points This commit adds tracing to the various places within raft where state transitions happen. When a message is first proposed, it matches the context of the original message to the raft entry and registers it for tracing. Then it adds to the trace at the key points during the transitions and finally stops tracing once a `MsgStorageApplyResp` is received past the entry. Fixes: #104035 Release note: None --- pkg/kv/kvserver/BUILD.bazel | 2 + pkg/kv/kvserver/client_raft_log_queue_test.go | 62 +++++++++++++++++++ pkg/kv/kvserver/replica.go | 5 ++ pkg/kv/kvserver/replica_destroy.go | 1 + pkg/kv/kvserver/replica_init.go | 2 + pkg/kv/kvserver/replica_proposal.go | 4 +- pkg/kv/kvserver/replica_proposal_buf.go | 39 ++++++++---- pkg/kv/kvserver/replica_proposal_buf_test.go | 2 + pkg/kv/kvserver/replica_raft.go | 10 +++ pkg/kv/kvserver/store.go | 5 ++ 10 files changed, 118 insertions(+), 14 deletions(-) diff --git a/pkg/kv/kvserver/BUILD.bazel b/pkg/kv/kvserver/BUILD.bazel index 18c1ca1edbef..bcf96f61a8d0 100644 --- a/pkg/kv/kvserver/BUILD.bazel +++ b/pkg/kv/kvserver/BUILD.bazel @@ -162,6 +162,7 @@ go_library( "//pkg/kv/kvserver/multiqueue", "//pkg/kv/kvserver/raftentry", "//pkg/kv/kvserver/raftlog", + "//pkg/kv/kvserver/rafttrace", "//pkg/kv/kvserver/rangefeed", "//pkg/kv/kvserver/rditer", "//pkg/kv/kvserver/readsummary", @@ -438,6 +439,7 @@ go_test( "//pkg/kv/kvserver/protectedts/ptutil", "//pkg/kv/kvserver/raftentry", "//pkg/kv/kvserver/raftlog", + "//pkg/kv/kvserver/rafttrace", "//pkg/kv/kvserver/raftutil", "//pkg/kv/kvserver/rangefeed", "//pkg/kv/kvserver/rditer", diff --git a/pkg/kv/kvserver/client_raft_log_queue_test.go b/pkg/kv/kvserver/client_raft_log_queue_test.go index 9a466877c684..a988d1a02970 100644 --- a/pkg/kv/kvserver/client_raft_log_queue_test.go +++ b/pkg/kv/kvserver/client_raft_log_queue_test.go @@ -20,6 +20,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/rafttrace" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/rpc" "github.com/cockroachdb/cockroach/pkg/rpc/nodedialer" @@ -33,6 +34,8 @@ import ( "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/tracing" + "github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb" "github.com/cockroachdb/errors" "github.com/cockroachdb/pebble/vfs" "github.com/gogo/protobuf/proto" @@ -132,6 +135,65 @@ func TestRaftLogQueue(t *testing.T) { } } +func TestRaftTracing(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + // TODO(baptist): Remove this once we change the default to be enabled. + st := cluster.MakeTestingClusterSettings() + rafttrace.MaxConcurrentRaftTraces.Override(context.Background(), &st.SV, 10) + + tc := testcluster.StartTestCluster(t, 3, base.TestClusterArgs{ + ReplicationMode: base.ReplicationManual, + ServerArgs: base.TestServerArgs{ + Settings: st, + RaftConfig: base.RaftConfig{ + RangeLeaseDuration: 24 * time.Hour, // disable lease moves + RaftElectionTimeoutTicks: 1 << 30, // disable elections + }, + }, + }) + defer tc.Stopper().Stop(context.Background()) + store := tc.GetFirstStoreFromServer(t, 0) + + // Write a single value to ensure we have a leader on n1. + key := tc.ScratchRange(t) + _, pErr := kv.SendWrapped(context.Background(), store.TestSender(), putArgs(key, []byte("value"))) + require.NoError(t, pErr.GoError()) + require.NoError(t, tc.WaitForSplitAndInitialization(key)) + // Set to have 3 voters. + tc.AddVotersOrFatal(t, key, tc.Targets(1, 2)...) + tc.WaitForVotersOrFatal(t, key, tc.Targets(1, 2)...) + + for i := 0; i < 100; i++ { + var finish func() tracingpb.Recording + ctx := context.Background() + if i == 50 { + // Trace a random request on a "client" tracer. + ctx, finish = tracing.ContextWithRecordingSpan(ctx, store.GetStoreConfig().Tracer(), "test") + } + _, pErr := kv.SendWrapped(ctx, store.TestSender(), putArgs(key, []byte(fmt.Sprintf("value-%d", i)))) + require.NoError(t, pErr.GoError()) + // Note that this is the clients span, there may be additional logs created after the span is returned. + if finish != nil { + output := finish().String() + // NB: It is hard to get all the messages in an expected order. We + // simply ensure some of the key messages are returned. Also note + // that we want to make sure that the logs are not reported against + // the tracing library, but the line that called into it. + expectedMessages := []string{ + `replica_proposal_buf.* flushing proposal to Raft`, + `replica_proposal_buf.* registering local trace`, + `replica_raft.* 1->2 MsgApp`, + `replica_raft.* 1->3 MsgApp`, + `replica_raft.* AppendThread->1 MsgStorageAppendResp`, + `ack-ing replication success to the client`, + } + require.NoError(t, testutils.MatchInOrder(output, expectedMessages...)) + } + } +} + // TestCrashWhileTruncatingSideloadedEntries emulates a process crash in the // middle of applying a raft log truncation command that removes some entries // from the sideloaded storage. The test expects that storage remains in a diff --git a/pkg/kv/kvserver/replica.go b/pkg/kv/kvserver/replica.go index 1eb4a3369dfc..0b2fbc22408d 100644 --- a/pkg/kv/kvserver/replica.go +++ b/pkg/kv/kvserver/replica.go @@ -31,6 +31,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/load" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/logstore" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/rafttrace" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/rangefeed" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/split" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/stateloader" @@ -891,6 +892,10 @@ type Replica struct { // MsgAppPull <=> LazyReplication. // Updated with both raftMu and mu held. currentRACv2Mode rac2.RaftMsgAppMode + + // raftTracer is used to trace raft messages that are sent with a + // tracing context. + raftTracer rafttrace.RaftTracer } // The raft log truncations that are pending. Access is protected by its own diff --git a/pkg/kv/kvserver/replica_destroy.go b/pkg/kv/kvserver/replica_destroy.go index 553b5e012fd7..3d730e240094 100644 --- a/pkg/kv/kvserver/replica_destroy.go +++ b/pkg/kv/kvserver/replica_destroy.go @@ -181,4 +181,5 @@ func (r *Replica) disconnectReplicationRaftMuLocked(ctx context.Context) { log.Fatalf(ctx, "removing raft group before destroying replica %s", r) } r.mu.internalRaftGroup = nil + r.mu.raftTracer.Close() } diff --git a/pkg/kv/kvserver/replica_init.go b/pkg/kv/kvserver/replica_init.go index 0a802d3982c3..193f98f9e57c 100644 --- a/pkg/kv/kvserver/replica_init.go +++ b/pkg/kv/kvserver/replica_init.go @@ -21,6 +21,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvstorage" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/load" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/logstore" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/rafttrace" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/split" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/stateloader" "github.com/cockroachdb/cockroach/pkg/raft" @@ -328,6 +329,7 @@ func (r *Replica) initRaftGroupRaftMuLockedReplicaMuLocked() error { return err } r.mu.internalRaftGroup = rg + r.mu.raftTracer = *rafttrace.NewRaftTracer(ctx, r.Tracer, r.ClusterSettings(), &r.store.concurrentRaftTraces) r.flowControlV2.InitRaftLocked( ctx, replica_rac2.NewRaftNode(rg, (*replicaForRACv2)(r)), rg.LogMark()) return nil diff --git a/pkg/kv/kvserver/replica_proposal.go b/pkg/kv/kvserver/replica_proposal.go index 5b5b8bc1eb0b..c0261d33b8eb 100644 --- a/pkg/kv/kvserver/replica_proposal.go +++ b/pkg/kv/kvserver/replica_proposal.go @@ -118,10 +118,10 @@ type ProposalData struct { // for best coverage. `p.ctx` should be used when a `replicatedCmd` is not in // scope, i.e. outside of raft command application. // - // The context may by updated during the proposal lifecycle but will never + // The context may be updated during the proposal lifecycle but will never // be nil. To clear out the context, set it to context.Background(). It is // protected by an atomic pointer because it can be read without holding the - // raftMu use ProposalData.Context() to read it. + // raftMu. Use ProposalData.Context() to read it. // // TODO(baptist): Track down all the places where we read and write ctx and // determine whether we can convert this back to non-atomic field. diff --git a/pkg/kv/kvserver/replica_proposal_buf.go b/pkg/kv/kvserver/replica_proposal_buf.go index a55cbc122ce3..5d96eceb0098 100644 --- a/pkg/kv/kvserver/replica_proposal_buf.go +++ b/pkg/kv/kvserver/replica_proposal_buf.go @@ -126,6 +126,7 @@ type singleBatchProposer interface { getReplicaID() roachpb.ReplicaID flowControlHandle(ctx context.Context) kvflowcontrol.Handle onErrProposalDropped([]raftpb.Entry, []*ProposalData, raftpb.StateType) + registerForTracing(*ProposalData, raftpb.Entry) bool } // A proposer is an object that uses a propBuf to coordinate Raft proposals. @@ -874,19 +875,29 @@ func proposeBatch( p.onErrProposalDropped(ents, props, raftGroup.BasicStatus().RaftState) return nil //nolint:returnerrcheck } - if err == nil { - // Now that we know what raft log position[1] this proposal is to end up - // in, deduct flow tokens for it. This is done without blocking (we've - // already waited for available flow tokens pre-evaluation). The tokens - // will later be returned once we're informed of the entry being - // admitted below raft. - // - // [1]: We're relying on an undocumented side effect of upstream raft - // API where it populates the index and term for the passed in - // slice of entries. See etcd-io/raft#57. - maybeDeductFlowTokens(ctx, p.flowControlHandle(ctx), handles, ents) + if err != nil { + return err } - return err + // Now that we know what raft log position[1] this proposal is to end up + // in, deduct flow tokens for it. This is done without blocking (we've + // already waited for available flow tokens pre-evaluation). The tokens + // will later be returned once we're informed of the entry being + // admitted below raft. + // + // [1]: We're relying on an undocumented side effect of upstream raft + // API where it populates the index and term for the passed in + // slice of entries. See etcd-io/raft#57. + maybeDeductFlowTokens(ctx, p.flowControlHandle(ctx), handles, ents) + + // Register the proposal with rafttrace. This will add the trace to the raft + // lifecycle. We trace at most one entry per batch, so break after the first + // one is successfully registered. + for i := range ents { + if p.registerForTracing(props[i], ents[i]) { + break + } + } + return nil } func maybeDeductFlowTokens( @@ -1173,6 +1184,10 @@ func (rp *replicaProposer) closedTimestampTarget() hlc.Timestamp { return (*Replica)(rp).closedTimestampTargetRLocked() } +func (rp *replicaProposer) registerForTracing(p *ProposalData, e raftpb.Entry) bool { + return (*Replica)(rp).mu.raftTracer.MaybeRegister(p.Context(), e) +} + func (rp *replicaProposer) withGroupLocked(fn func(raftGroup proposerRaft) error) error { return (*Replica)(rp).withRaftGroupLocked(func(raftGroup *raft.RawNode) (bool, error) { // We're proposing a command here so there is no need to wake the leader diff --git a/pkg/kv/kvserver/replica_proposal_buf_test.go b/pkg/kv/kvserver/replica_proposal_buf_test.go index 51d72be45a23..126febaaa575 100644 --- a/pkg/kv/kvserver/replica_proposal_buf_test.go +++ b/pkg/kv/kvserver/replica_proposal_buf_test.go @@ -217,6 +217,8 @@ func (t *testProposer) campaignLocked(ctx context.Context) { } } +func (t *testProposer) registerForTracing(*ProposalData, raftpb.Entry) bool { return true } + func (t *testProposer) rejectProposalWithErrLocked(_ context.Context, _ *ProposalData, err error) { if t.onRejectProposalWithErrLocked == nil { panic(fmt.Sprintf("unexpected rejectProposalWithErrLocked call: err=%v", err)) diff --git a/pkg/kv/kvserver/replica_raft.go b/pkg/kv/kvserver/replica_raft.go index 5e6e2a9720d8..165d2072cdaa 100644 --- a/pkg/kv/kvserver/replica_raft.go +++ b/pkg/kv/kvserver/replica_raft.go @@ -638,6 +638,11 @@ func (r *Replica) stepRaftGroupRaftMuLocked(req *kvserverpb.RaftMessageRequest) var sideChannelInfo replica_rac2.SideChannelInfoUsingRaftMessageRequest var admittedVector rac2.AdmittedVector err := r.withRaftGroup(func(raftGroup *raft.RawNode) (bool, error) { + // If this message requested tracing, begin tracing it. + for _, e := range req.TracedEntries { + r.mu.raftTracer.RegisterRemote(e) + } + r.mu.raftTracer.MaybeTrace(req.Message) // We're processing an incoming raft message (from a batch that may // include MsgVotes), so don't campaign if we wake up our raft // group. @@ -1212,6 +1217,7 @@ func (r *Replica) handleRaftReadyRaftMuLocked( } } + r.mu.raftTracer.MaybeTrace(msgStorageAppend) if state, err = s.StoreEntries(ctx, state, app, cb, &stats.append); err != nil { return stats, errors.Wrap(err, "while storing log entries") } @@ -1243,6 +1249,7 @@ func (r *Replica) handleRaftReadyRaftMuLocked( stats.tApplicationBegin = timeutil.Now() if hasMsg(msgStorageApply) { + r.mu.raftTracer.MaybeTrace(msgStorageApply) r.traceEntries(msgStorageApply.Entries, "committed, before applying any entries") err := appTask.ApplyCommittedEntries(ctx) @@ -1992,6 +1999,7 @@ func (r *Replica) deliverLocalRaftMsgsRaftMuLockedReplicaMuLocked( } for i, m := range localMsgs { + r.mu.raftTracer.MaybeTrace(m) if err := raftGroup.Step(m); err != nil { log.Fatalf(ctx, "unexpected error stepping local raft message [%s]: %v", raft.DescribeMessage(m, raftEntryFormatter), err) @@ -2015,6 +2023,7 @@ func (r *Replica) sendRaftMessage( lastToReplica, lastFromReplica := r.getLastReplicaDescriptors() r.mu.RLock() + traced := r.mu.raftTracer.MaybeTrace(msg) fromReplica, fromErr := r.getReplicaDescriptorByIDRLocked(roachpb.ReplicaID(msg.From), lastToReplica) toReplica, toErr := r.getReplicaDescriptorByIDRLocked(roachpb.ReplicaID(msg.To), lastFromReplica) var startKey roachpb.RKey @@ -2067,6 +2076,7 @@ func (r *Replica) sendRaftMessage( RangeStartKey: startKey, // usually nil UsingRac2Protocol: r.flowControlV2.GetEnabledWhenLeader() >= kvflowcontrol.V2EnabledWhenLeaderV1Encoding, LowPriorityOverride: lowPriorityOverride, + TracedEntries: traced, } // For RACv2, annotate successful MsgAppResp messages with the vector of // admitted log indices, by priority. diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go index 3ec55dbaf5bb..46867e63aee3 100644 --- a/pkg/kv/kvserver/store.go +++ b/pkg/kv/kvserver/store.go @@ -942,6 +942,11 @@ type Store struct { // has likely improved). draining atomic.Bool + // concurrentRaftTraces is the number of concurrent raft trace requests that + // are currently registered. This limit is used to prevent extensive raft + // tracing from inadvertently impacting performance. + concurrentRaftTraces atomic.Int64 + // Locking notes: To avoid deadlocks, the following lock order must be // obeyed: baseQueue.mu < Replica.raftMu < Replica.readOnlyCmdMu < Store.mu // < Replica.mu < Replica.unreachablesMu < Store.coalescedMu < Store.scheduler.mu.