Skip to content

Commit

Permalink
raft: add tracing to important raft points
Browse files Browse the repository at this point in the history
This commit adds tracing to the various places within raft where state
transitions happen. When a message is first proposed, it matches the
context of the original message to the raft entry and registers it for
tracing. Then it adds to the trace at the key points during the
transitions and finally stops tracing once a `MsgStorageApplyResp` is
received past the entry.

Fixes: cockroachdb#104035

Release note: None
  • Loading branch information
andrewbaptist committed Oct 18, 2024
1 parent fc7487e commit a58e2a9
Show file tree
Hide file tree
Showing 10 changed files with 118 additions and 14 deletions.
2 changes: 2 additions & 0 deletions pkg/kv/kvserver/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ go_library(
"//pkg/kv/kvserver/multiqueue",
"//pkg/kv/kvserver/raftentry",
"//pkg/kv/kvserver/raftlog",
"//pkg/kv/kvserver/rafttrace",
"//pkg/kv/kvserver/rangefeed",
"//pkg/kv/kvserver/rditer",
"//pkg/kv/kvserver/readsummary",
Expand Down Expand Up @@ -438,6 +439,7 @@ go_test(
"//pkg/kv/kvserver/protectedts/ptutil",
"//pkg/kv/kvserver/raftentry",
"//pkg/kv/kvserver/raftlog",
"//pkg/kv/kvserver/rafttrace",
"//pkg/kv/kvserver/raftutil",
"//pkg/kv/kvserver/rangefeed",
"//pkg/kv/kvserver/rditer",
Expand Down
62 changes: 62 additions & 0 deletions pkg/kv/kvserver/client_raft_log_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/rafttrace"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/rpc"
"github.com/cockroachdb/cockroach/pkg/rpc/nodedialer"
Expand All @@ -33,6 +34,8 @@ import (
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/pebble/vfs"
"github.com/gogo/protobuf/proto"
Expand Down Expand Up @@ -132,6 +135,65 @@ func TestRaftLogQueue(t *testing.T) {
}
}

func TestRaftTracing(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

// TODO(baptist): Remove this once we change the default to be enabled.
st := cluster.MakeTestingClusterSettings()
rafttrace.MaxConcurrentRaftTraces.Override(context.Background(), &st.SV, 10)

tc := testcluster.StartTestCluster(t, 3, base.TestClusterArgs{
ReplicationMode: base.ReplicationManual,
ServerArgs: base.TestServerArgs{
Settings: st,
RaftConfig: base.RaftConfig{
RangeLeaseDuration: 24 * time.Hour, // disable lease moves
RaftElectionTimeoutTicks: 1 << 30, // disable elections
},
},
})
defer tc.Stopper().Stop(context.Background())
store := tc.GetFirstStoreFromServer(t, 0)

// Write a single value to ensure we have a leader on n1.
key := tc.ScratchRange(t)
_, pErr := kv.SendWrapped(context.Background(), store.TestSender(), putArgs(key, []byte("value")))
require.NoError(t, pErr.GoError())
require.NoError(t, tc.WaitForSplitAndInitialization(key))
// Set to have 3 voters.
tc.AddVotersOrFatal(t, key, tc.Targets(1, 2)...)
tc.WaitForVotersOrFatal(t, key, tc.Targets(1, 2)...)

for i := 0; i < 100; i++ {
var finish func() tracingpb.Recording
ctx := context.Background()
if i == 50 {
// Trace a random request on a "client" tracer.
ctx, finish = tracing.ContextWithRecordingSpan(ctx, store.GetStoreConfig().Tracer(), "test")
}
_, pErr := kv.SendWrapped(ctx, store.TestSender(), putArgs(key, []byte(fmt.Sprintf("value-%d", i))))
require.NoError(t, pErr.GoError())
// Note that this is the clients span, there may be additional logs created after the span is returned.
if finish != nil {
output := finish().String()
// NB: It is hard to get all the messages in an expected order. We
// simply ensure some of the key messages are returned. Also note
// that we want to make sure that the logs are not reported against
// the tracing library, but the line that called into it.
expectedMessages := []string{
`replica_proposal_buf.* flushing proposal to Raft`,
`replica_proposal_buf.* registering local trace`,
`replica_raft.* 1->2 MsgApp`,
`replica_raft.* 1->3 MsgApp`,
`replica_raft.* AppendThread->1 MsgStorageAppendResp`,
`ack-ing replication success to the client`,
}
require.NoError(t, testutils.MatchInOrder(output, expectedMessages...))
}
}
}

// TestCrashWhileTruncatingSideloadedEntries emulates a process crash in the
// middle of applying a raft log truncation command that removes some entries
// from the sideloaded storage. The test expects that storage remains in a
Expand Down
5 changes: 5 additions & 0 deletions pkg/kv/kvserver/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/load"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/logstore"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/rafttrace"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/rangefeed"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/split"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/stateloader"
Expand Down Expand Up @@ -891,6 +892,10 @@ type Replica struct {
// MsgAppPull <=> LazyReplication.
// Updated with both raftMu and mu held.
currentRACv2Mode rac2.RaftMsgAppMode

// raftTracer is used to trace raft messages that are sent with a
// tracing context.
raftTracer rafttrace.RaftTracer
}

// The raft log truncations that are pending. Access is protected by its own
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/replica_destroy.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,4 +181,5 @@ func (r *Replica) disconnectReplicationRaftMuLocked(ctx context.Context) {
log.Fatalf(ctx, "removing raft group before destroying replica %s", r)
}
r.mu.internalRaftGroup = nil
r.mu.raftTracer.Close()
}
2 changes: 2 additions & 0 deletions pkg/kv/kvserver/replica_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvstorage"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/load"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/logstore"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/rafttrace"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/split"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/stateloader"
"github.com/cockroachdb/cockroach/pkg/raft"
Expand Down Expand Up @@ -328,6 +329,7 @@ func (r *Replica) initRaftGroupRaftMuLockedReplicaMuLocked() error {
return err
}
r.mu.internalRaftGroup = rg
r.mu.raftTracer = *rafttrace.NewRaftTracer(ctx, r.Tracer, r.ClusterSettings(), &r.store.concurrentRaftTraces)
r.flowControlV2.InitRaftLocked(
ctx, replica_rac2.NewRaftNode(rg, (*replicaForRACv2)(r)), rg.LogMark())
return nil
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/replica_proposal.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,10 +118,10 @@ type ProposalData struct {
// for best coverage. `p.ctx` should be used when a `replicatedCmd` is not in
// scope, i.e. outside of raft command application.
//
// The context may by updated during the proposal lifecycle but will never
// The context may be updated during the proposal lifecycle but will never
// be nil. To clear out the context, set it to context.Background(). It is
// protected by an atomic pointer because it can be read without holding the
// raftMu use ProposalData.Context() to read it.
// raftMu. Use ProposalData.Context() to read it.
//
// TODO(baptist): Track down all the places where we read and write ctx and
// determine whether we can convert this back to non-atomic field.
Expand Down
39 changes: 27 additions & 12 deletions pkg/kv/kvserver/replica_proposal_buf.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ type singleBatchProposer interface {
getReplicaID() roachpb.ReplicaID
flowControlHandle(ctx context.Context) kvflowcontrol.Handle
onErrProposalDropped([]raftpb.Entry, []*ProposalData, raftpb.StateType)
registerForTracing(*ProposalData, raftpb.Entry) bool
}

// A proposer is an object that uses a propBuf to coordinate Raft proposals.
Expand Down Expand Up @@ -874,19 +875,29 @@ func proposeBatch(
p.onErrProposalDropped(ents, props, raftGroup.BasicStatus().RaftState)
return nil //nolint:returnerrcheck
}
if err == nil {
// Now that we know what raft log position[1] this proposal is to end up
// in, deduct flow tokens for it. This is done without blocking (we've
// already waited for available flow tokens pre-evaluation). The tokens
// will later be returned once we're informed of the entry being
// admitted below raft.
//
// [1]: We're relying on an undocumented side effect of upstream raft
// API where it populates the index and term for the passed in
// slice of entries. See etcd-io/raft#57.
maybeDeductFlowTokens(ctx, p.flowControlHandle(ctx), handles, ents)
if err != nil {
return err
}
return err
// Now that we know what raft log position[1] this proposal is to end up
// in, deduct flow tokens for it. This is done without blocking (we've
// already waited for available flow tokens pre-evaluation). The tokens
// will later be returned once we're informed of the entry being
// admitted below raft.
//
// [1]: We're relying on an undocumented side effect of upstream raft
// API where it populates the index and term for the passed in
// slice of entries. See etcd-io/raft#57.
maybeDeductFlowTokens(ctx, p.flowControlHandle(ctx), handles, ents)

// Register the proposal with rafttrace. This will add the trace to the raft
// lifecycle. We trace at most one entry per batch, so break after the first
// one is successfully registered.
for i := range ents {
if p.registerForTracing(props[i], ents[i]) {
break
}
}
return nil
}

func maybeDeductFlowTokens(
Expand Down Expand Up @@ -1173,6 +1184,10 @@ func (rp *replicaProposer) closedTimestampTarget() hlc.Timestamp {
return (*Replica)(rp).closedTimestampTargetRLocked()
}

func (rp *replicaProposer) registerForTracing(p *ProposalData, e raftpb.Entry) bool {
return (*Replica)(rp).mu.raftTracer.MaybeRegister(p.Context(), e)
}

func (rp *replicaProposer) withGroupLocked(fn func(raftGroup proposerRaft) error) error {
return (*Replica)(rp).withRaftGroupLocked(func(raftGroup *raft.RawNode) (bool, error) {
// We're proposing a command here so there is no need to wake the leader
Expand Down
2 changes: 2 additions & 0 deletions pkg/kv/kvserver/replica_proposal_buf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,8 @@ func (t *testProposer) campaignLocked(ctx context.Context) {
}
}

func (t *testProposer) registerForTracing(*ProposalData, raftpb.Entry) bool { return true }

func (t *testProposer) rejectProposalWithErrLocked(_ context.Context, _ *ProposalData, err error) {
if t.onRejectProposalWithErrLocked == nil {
panic(fmt.Sprintf("unexpected rejectProposalWithErrLocked call: err=%v", err))
Expand Down
10 changes: 10 additions & 0 deletions pkg/kv/kvserver/replica_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -638,6 +638,11 @@ func (r *Replica) stepRaftGroupRaftMuLocked(req *kvserverpb.RaftMessageRequest)
var sideChannelInfo replica_rac2.SideChannelInfoUsingRaftMessageRequest
var admittedVector rac2.AdmittedVector
err := r.withRaftGroup(func(raftGroup *raft.RawNode) (bool, error) {
// If this message requested tracing, begin tracing it.
for _, e := range req.TracedEntries {
r.mu.raftTracer.RegisterRemote(e)
}
r.mu.raftTracer.MaybeTrace(req.Message)
// We're processing an incoming raft message (from a batch that may
// include MsgVotes), so don't campaign if we wake up our raft
// group.
Expand Down Expand Up @@ -1212,6 +1217,7 @@ func (r *Replica) handleRaftReadyRaftMuLocked(
}
}

r.mu.raftTracer.MaybeTrace(msgStorageAppend)
if state, err = s.StoreEntries(ctx, state, app, cb, &stats.append); err != nil {
return stats, errors.Wrap(err, "while storing log entries")
}
Expand Down Expand Up @@ -1243,6 +1249,7 @@ func (r *Replica) handleRaftReadyRaftMuLocked(

stats.tApplicationBegin = timeutil.Now()
if hasMsg(msgStorageApply) {
r.mu.raftTracer.MaybeTrace(msgStorageApply)
r.traceEntries(msgStorageApply.Entries, "committed, before applying any entries")

err := appTask.ApplyCommittedEntries(ctx)
Expand Down Expand Up @@ -1992,6 +1999,7 @@ func (r *Replica) deliverLocalRaftMsgsRaftMuLockedReplicaMuLocked(
}

for i, m := range localMsgs {
r.mu.raftTracer.MaybeTrace(m)
if err := raftGroup.Step(m); err != nil {
log.Fatalf(ctx, "unexpected error stepping local raft message [%s]: %v",
raft.DescribeMessage(m, raftEntryFormatter), err)
Expand All @@ -2015,6 +2023,7 @@ func (r *Replica) sendRaftMessage(
lastToReplica, lastFromReplica := r.getLastReplicaDescriptors()

r.mu.RLock()
traced := r.mu.raftTracer.MaybeTrace(msg)
fromReplica, fromErr := r.getReplicaDescriptorByIDRLocked(roachpb.ReplicaID(msg.From), lastToReplica)
toReplica, toErr := r.getReplicaDescriptorByIDRLocked(roachpb.ReplicaID(msg.To), lastFromReplica)
var startKey roachpb.RKey
Expand Down Expand Up @@ -2067,6 +2076,7 @@ func (r *Replica) sendRaftMessage(
RangeStartKey: startKey, // usually nil
UsingRac2Protocol: r.flowControlV2.GetEnabledWhenLeader() >= kvflowcontrol.V2EnabledWhenLeaderV1Encoding,
LowPriorityOverride: lowPriorityOverride,
TracedEntries: traced,
}
// For RACv2, annotate successful MsgAppResp messages with the vector of
// admitted log indices, by priority.
Expand Down
5 changes: 5 additions & 0 deletions pkg/kv/kvserver/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -942,6 +942,11 @@ type Store struct {
// has likely improved).
draining atomic.Bool

// concurrentRaftTraces is the number of concurrent raft trace requests that
// are currently registered. This limit is used to prevent extensive raft
// tracing from inadvertently impacting performance.
concurrentRaftTraces atomic.Int64

// Locking notes: To avoid deadlocks, the following lock order must be
// obeyed: baseQueue.mu < Replica.raftMu < Replica.readOnlyCmdMu < Store.mu
// < Replica.mu < Replica.unreachablesMu < Store.coalescedMu < Store.scheduler.mu.
Expand Down

0 comments on commit a58e2a9

Please sign in to comment.