diff --git a/pkg/kv/kvserver/BUILD.bazel b/pkg/kv/kvserver/BUILD.bazel index 18c1ca1edbef..bcf96f61a8d0 100644 --- a/pkg/kv/kvserver/BUILD.bazel +++ b/pkg/kv/kvserver/BUILD.bazel @@ -162,6 +162,7 @@ go_library( "//pkg/kv/kvserver/multiqueue", "//pkg/kv/kvserver/raftentry", "//pkg/kv/kvserver/raftlog", + "//pkg/kv/kvserver/rafttrace", "//pkg/kv/kvserver/rangefeed", "//pkg/kv/kvserver/rditer", "//pkg/kv/kvserver/readsummary", @@ -438,6 +439,7 @@ go_test( "//pkg/kv/kvserver/protectedts/ptutil", "//pkg/kv/kvserver/raftentry", "//pkg/kv/kvserver/raftlog", + "//pkg/kv/kvserver/rafttrace", "//pkg/kv/kvserver/raftutil", "//pkg/kv/kvserver/rangefeed", "//pkg/kv/kvserver/rditer", diff --git a/pkg/kv/kvserver/client_raft_log_queue_test.go b/pkg/kv/kvserver/client_raft_log_queue_test.go index 9a466877c684..a988d1a02970 100644 --- a/pkg/kv/kvserver/client_raft_log_queue_test.go +++ b/pkg/kv/kvserver/client_raft_log_queue_test.go @@ -20,6 +20,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/rafttrace" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/rpc" "github.com/cockroachdb/cockroach/pkg/rpc/nodedialer" @@ -33,6 +34,8 @@ import ( "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/tracing" + "github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb" "github.com/cockroachdb/errors" "github.com/cockroachdb/pebble/vfs" "github.com/gogo/protobuf/proto" @@ -132,6 +135,65 @@ func TestRaftLogQueue(t *testing.T) { } } +func TestRaftTracing(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + // TODO(baptist): Remove this once we change the default to be enabled. + st := cluster.MakeTestingClusterSettings() + rafttrace.MaxConcurrentRaftTraces.Override(context.Background(), &st.SV, 10) + + tc := testcluster.StartTestCluster(t, 3, base.TestClusterArgs{ + ReplicationMode: base.ReplicationManual, + ServerArgs: base.TestServerArgs{ + Settings: st, + RaftConfig: base.RaftConfig{ + RangeLeaseDuration: 24 * time.Hour, // disable lease moves + RaftElectionTimeoutTicks: 1 << 30, // disable elections + }, + }, + }) + defer tc.Stopper().Stop(context.Background()) + store := tc.GetFirstStoreFromServer(t, 0) + + // Write a single value to ensure we have a leader on n1. + key := tc.ScratchRange(t) + _, pErr := kv.SendWrapped(context.Background(), store.TestSender(), putArgs(key, []byte("value"))) + require.NoError(t, pErr.GoError()) + require.NoError(t, tc.WaitForSplitAndInitialization(key)) + // Set to have 3 voters. + tc.AddVotersOrFatal(t, key, tc.Targets(1, 2)...) + tc.WaitForVotersOrFatal(t, key, tc.Targets(1, 2)...) + + for i := 0; i < 100; i++ { + var finish func() tracingpb.Recording + ctx := context.Background() + if i == 50 { + // Trace a random request on a "client" tracer. + ctx, finish = tracing.ContextWithRecordingSpan(ctx, store.GetStoreConfig().Tracer(), "test") + } + _, pErr := kv.SendWrapped(ctx, store.TestSender(), putArgs(key, []byte(fmt.Sprintf("value-%d", i)))) + require.NoError(t, pErr.GoError()) + // Note that this is the clients span, there may be additional logs created after the span is returned. + if finish != nil { + output := finish().String() + // NB: It is hard to get all the messages in an expected order. We + // simply ensure some of the key messages are returned. Also note + // that we want to make sure that the logs are not reported against + // the tracing library, but the line that called into it. + expectedMessages := []string{ + `replica_proposal_buf.* flushing proposal to Raft`, + `replica_proposal_buf.* registering local trace`, + `replica_raft.* 1->2 MsgApp`, + `replica_raft.* 1->3 MsgApp`, + `replica_raft.* AppendThread->1 MsgStorageAppendResp`, + `ack-ing replication success to the client`, + } + require.NoError(t, testutils.MatchInOrder(output, expectedMessages...)) + } + } +} + // TestCrashWhileTruncatingSideloadedEntries emulates a process crash in the // middle of applying a raft log truncation command that removes some entries // from the sideloaded storage. The test expects that storage remains in a diff --git a/pkg/kv/kvserver/replica.go b/pkg/kv/kvserver/replica.go index 1eb4a3369dfc..0b2fbc22408d 100644 --- a/pkg/kv/kvserver/replica.go +++ b/pkg/kv/kvserver/replica.go @@ -31,6 +31,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/load" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/logstore" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/rafttrace" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/rangefeed" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/split" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/stateloader" @@ -891,6 +892,10 @@ type Replica struct { // MsgAppPull <=> LazyReplication. // Updated with both raftMu and mu held. currentRACv2Mode rac2.RaftMsgAppMode + + // raftTracer is used to trace raft messages that are sent with a + // tracing context. + raftTracer rafttrace.RaftTracer } // The raft log truncations that are pending. Access is protected by its own diff --git a/pkg/kv/kvserver/replica_destroy.go b/pkg/kv/kvserver/replica_destroy.go index 553b5e012fd7..3d730e240094 100644 --- a/pkg/kv/kvserver/replica_destroy.go +++ b/pkg/kv/kvserver/replica_destroy.go @@ -181,4 +181,5 @@ func (r *Replica) disconnectReplicationRaftMuLocked(ctx context.Context) { log.Fatalf(ctx, "removing raft group before destroying replica %s", r) } r.mu.internalRaftGroup = nil + r.mu.raftTracer.Close() } diff --git a/pkg/kv/kvserver/replica_init.go b/pkg/kv/kvserver/replica_init.go index 0a802d3982c3..193f98f9e57c 100644 --- a/pkg/kv/kvserver/replica_init.go +++ b/pkg/kv/kvserver/replica_init.go @@ -21,6 +21,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvstorage" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/load" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/logstore" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/rafttrace" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/split" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/stateloader" "github.com/cockroachdb/cockroach/pkg/raft" @@ -328,6 +329,7 @@ func (r *Replica) initRaftGroupRaftMuLockedReplicaMuLocked() error { return err } r.mu.internalRaftGroup = rg + r.mu.raftTracer = *rafttrace.NewRaftTracer(ctx, r.Tracer, r.ClusterSettings(), &r.store.concurrentRaftTraces) r.flowControlV2.InitRaftLocked( ctx, replica_rac2.NewRaftNode(rg, (*replicaForRACv2)(r)), rg.LogMark()) return nil diff --git a/pkg/kv/kvserver/replica_proposal.go b/pkg/kv/kvserver/replica_proposal.go index 5b5b8bc1eb0b..c0261d33b8eb 100644 --- a/pkg/kv/kvserver/replica_proposal.go +++ b/pkg/kv/kvserver/replica_proposal.go @@ -118,10 +118,10 @@ type ProposalData struct { // for best coverage. `p.ctx` should be used when a `replicatedCmd` is not in // scope, i.e. outside of raft command application. // - // The context may by updated during the proposal lifecycle but will never + // The context may be updated during the proposal lifecycle but will never // be nil. To clear out the context, set it to context.Background(). It is // protected by an atomic pointer because it can be read without holding the - // raftMu use ProposalData.Context() to read it. + // raftMu. Use ProposalData.Context() to read it. // // TODO(baptist): Track down all the places where we read and write ctx and // determine whether we can convert this back to non-atomic field. diff --git a/pkg/kv/kvserver/replica_proposal_buf.go b/pkg/kv/kvserver/replica_proposal_buf.go index a55cbc122ce3..5d96eceb0098 100644 --- a/pkg/kv/kvserver/replica_proposal_buf.go +++ b/pkg/kv/kvserver/replica_proposal_buf.go @@ -126,6 +126,7 @@ type singleBatchProposer interface { getReplicaID() roachpb.ReplicaID flowControlHandle(ctx context.Context) kvflowcontrol.Handle onErrProposalDropped([]raftpb.Entry, []*ProposalData, raftpb.StateType) + registerForTracing(*ProposalData, raftpb.Entry) bool } // A proposer is an object that uses a propBuf to coordinate Raft proposals. @@ -874,19 +875,29 @@ func proposeBatch( p.onErrProposalDropped(ents, props, raftGroup.BasicStatus().RaftState) return nil //nolint:returnerrcheck } - if err == nil { - // Now that we know what raft log position[1] this proposal is to end up - // in, deduct flow tokens for it. This is done without blocking (we've - // already waited for available flow tokens pre-evaluation). The tokens - // will later be returned once we're informed of the entry being - // admitted below raft. - // - // [1]: We're relying on an undocumented side effect of upstream raft - // API where it populates the index and term for the passed in - // slice of entries. See etcd-io/raft#57. - maybeDeductFlowTokens(ctx, p.flowControlHandle(ctx), handles, ents) + if err != nil { + return err } - return err + // Now that we know what raft log position[1] this proposal is to end up + // in, deduct flow tokens for it. This is done without blocking (we've + // already waited for available flow tokens pre-evaluation). The tokens + // will later be returned once we're informed of the entry being + // admitted below raft. + // + // [1]: We're relying on an undocumented side effect of upstream raft + // API where it populates the index and term for the passed in + // slice of entries. See etcd-io/raft#57. + maybeDeductFlowTokens(ctx, p.flowControlHandle(ctx), handles, ents) + + // Register the proposal with rafttrace. This will add the trace to the raft + // lifecycle. We trace at most one entry per batch, so break after the first + // one is successfully registered. + for i := range ents { + if p.registerForTracing(props[i], ents[i]) { + break + } + } + return nil } func maybeDeductFlowTokens( @@ -1173,6 +1184,10 @@ func (rp *replicaProposer) closedTimestampTarget() hlc.Timestamp { return (*Replica)(rp).closedTimestampTargetRLocked() } +func (rp *replicaProposer) registerForTracing(p *ProposalData, e raftpb.Entry) bool { + return (*Replica)(rp).mu.raftTracer.MaybeRegister(p.Context(), e) +} + func (rp *replicaProposer) withGroupLocked(fn func(raftGroup proposerRaft) error) error { return (*Replica)(rp).withRaftGroupLocked(func(raftGroup *raft.RawNode) (bool, error) { // We're proposing a command here so there is no need to wake the leader diff --git a/pkg/kv/kvserver/replica_proposal_buf_test.go b/pkg/kv/kvserver/replica_proposal_buf_test.go index 51d72be45a23..126febaaa575 100644 --- a/pkg/kv/kvserver/replica_proposal_buf_test.go +++ b/pkg/kv/kvserver/replica_proposal_buf_test.go @@ -217,6 +217,8 @@ func (t *testProposer) campaignLocked(ctx context.Context) { } } +func (t *testProposer) registerForTracing(*ProposalData, raftpb.Entry) bool { return true } + func (t *testProposer) rejectProposalWithErrLocked(_ context.Context, _ *ProposalData, err error) { if t.onRejectProposalWithErrLocked == nil { panic(fmt.Sprintf("unexpected rejectProposalWithErrLocked call: err=%v", err)) diff --git a/pkg/kv/kvserver/replica_raft.go b/pkg/kv/kvserver/replica_raft.go index 5e6e2a9720d8..165d2072cdaa 100644 --- a/pkg/kv/kvserver/replica_raft.go +++ b/pkg/kv/kvserver/replica_raft.go @@ -638,6 +638,11 @@ func (r *Replica) stepRaftGroupRaftMuLocked(req *kvserverpb.RaftMessageRequest) var sideChannelInfo replica_rac2.SideChannelInfoUsingRaftMessageRequest var admittedVector rac2.AdmittedVector err := r.withRaftGroup(func(raftGroup *raft.RawNode) (bool, error) { + // If this message requested tracing, begin tracing it. + for _, e := range req.TracedEntries { + r.mu.raftTracer.RegisterRemote(e) + } + r.mu.raftTracer.MaybeTrace(req.Message) // We're processing an incoming raft message (from a batch that may // include MsgVotes), so don't campaign if we wake up our raft // group. @@ -1212,6 +1217,7 @@ func (r *Replica) handleRaftReadyRaftMuLocked( } } + r.mu.raftTracer.MaybeTrace(msgStorageAppend) if state, err = s.StoreEntries(ctx, state, app, cb, &stats.append); err != nil { return stats, errors.Wrap(err, "while storing log entries") } @@ -1243,6 +1249,7 @@ func (r *Replica) handleRaftReadyRaftMuLocked( stats.tApplicationBegin = timeutil.Now() if hasMsg(msgStorageApply) { + r.mu.raftTracer.MaybeTrace(msgStorageApply) r.traceEntries(msgStorageApply.Entries, "committed, before applying any entries") err := appTask.ApplyCommittedEntries(ctx) @@ -1992,6 +1999,7 @@ func (r *Replica) deliverLocalRaftMsgsRaftMuLockedReplicaMuLocked( } for i, m := range localMsgs { + r.mu.raftTracer.MaybeTrace(m) if err := raftGroup.Step(m); err != nil { log.Fatalf(ctx, "unexpected error stepping local raft message [%s]: %v", raft.DescribeMessage(m, raftEntryFormatter), err) @@ -2015,6 +2023,7 @@ func (r *Replica) sendRaftMessage( lastToReplica, lastFromReplica := r.getLastReplicaDescriptors() r.mu.RLock() + traced := r.mu.raftTracer.MaybeTrace(msg) fromReplica, fromErr := r.getReplicaDescriptorByIDRLocked(roachpb.ReplicaID(msg.From), lastToReplica) toReplica, toErr := r.getReplicaDescriptorByIDRLocked(roachpb.ReplicaID(msg.To), lastFromReplica) var startKey roachpb.RKey @@ -2067,6 +2076,7 @@ func (r *Replica) sendRaftMessage( RangeStartKey: startKey, // usually nil UsingRac2Protocol: r.flowControlV2.GetEnabledWhenLeader() >= kvflowcontrol.V2EnabledWhenLeaderV1Encoding, LowPriorityOverride: lowPriorityOverride, + TracedEntries: traced, } // For RACv2, annotate successful MsgAppResp messages with the vector of // admitted log indices, by priority. diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go index 3ec55dbaf5bb..46867e63aee3 100644 --- a/pkg/kv/kvserver/store.go +++ b/pkg/kv/kvserver/store.go @@ -942,6 +942,11 @@ type Store struct { // has likely improved). draining atomic.Bool + // concurrentRaftTraces is the number of concurrent raft trace requests that + // are currently registered. This limit is used to prevent extensive raft + // tracing from inadvertently impacting performance. + concurrentRaftTraces atomic.Int64 + // Locking notes: To avoid deadlocks, the following lock order must be // obeyed: baseQueue.mu < Replica.raftMu < Replica.readOnlyCmdMu < Store.mu // < Replica.mu < Replica.unreachablesMu < Store.coalescedMu < Store.scheduler.mu.