From 34eb57a26541d995f0dc01521aec057af5d5660d Mon Sep 17 00:00:00 2001 From: adityamaru Date: Thu, 21 Sep 2023 14:52:23 -0400 Subject: [PATCH] bulk: add histograms to ingestion performance stats This change adds two histograms to the IngestionPerformanceStats. A histogram that tracks the batch wait, and one that tracks the SST data size. This change also adds a count for the number of ssts that are ingested as write batches. Release note: None --- pkg/BUILD.bazel | 2 + pkg/kv/bulk/bulkpb/BUILD.bazel | 24 ++- pkg/kv/bulk/bulkpb/bulk.go | 60 +++++++ pkg/kv/bulk/bulkpb/bulkpb.proto | 22 +++ .../bulkpb/ingestion_performance_stats.go | 51 ++++++ .../ingestion_performance_stats_test.go | 158 ++++++++++++++++++ pkg/kv/bulk/sst_batcher.go | 1 + pkg/util/bulk/BUILD.bazel | 2 - pkg/util/bulk/tracing_aggregator_test.go | 78 --------- 9 files changed, 316 insertions(+), 82 deletions(-) create mode 100644 pkg/kv/bulk/bulkpb/bulk.go create mode 100644 pkg/kv/bulk/bulkpb/ingestion_performance_stats_test.go diff --git a/pkg/BUILD.bazel b/pkg/BUILD.bazel index 52b19b4455e..6475279ac83 100644 --- a/pkg/BUILD.bazel +++ b/pkg/BUILD.bazel @@ -189,6 +189,7 @@ ALL_TESTS = [ "//pkg/jobs:jobs_test", "//pkg/keys:keys_test", "//pkg/keyvisualizer/spanstatsconsumer:spanstatsconsumer_test", + "//pkg/kv/bulk/bulkpb:bulkpb_test", "//pkg/kv/bulk:bulk_test", "//pkg/kv/kvclient/kvcoord:kvcoord_disallowed_imports_test", "//pkg/kv/kvclient/kvcoord:kvcoord_test", @@ -1266,6 +1267,7 @@ GO_TARGETS = [ "//pkg/keyvisualizer/spanstatskvaccessor:spanstatskvaccessor", "//pkg/keyvisualizer:keyvisualizer", "//pkg/kv/bulk/bulkpb:bulkpb", + "//pkg/kv/bulk/bulkpb:bulkpb_test", "//pkg/kv/bulk:bulk", "//pkg/kv/bulk:bulk_test", "//pkg/kv/kvbase:kvbase", diff --git a/pkg/kv/bulk/bulkpb/BUILD.bazel b/pkg/kv/bulk/bulkpb/BUILD.bazel index db00be4dc41..9e272cbe06b 100644 --- a/pkg/kv/bulk/bulkpb/BUILD.bazel +++ b/pkg/kv/bulk/bulkpb/BUILD.bazel @@ -1,5 +1,5 @@ load("@rules_proto//proto:defs.bzl", "proto_library") -load("@io_bazel_rules_go//go:def.bzl", "go_library") +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") load("@io_bazel_rules_go//proto:def.bzl", "go_proto_library") proto_library( @@ -27,7 +27,10 @@ go_proto_library( go_library( name = "bulkpb", - srcs = ["ingestion_performance_stats.go"], + srcs = [ + "bulk.go", + "ingestion_performance_stats.go", + ], embed = [":bulkpb_go_proto"], importpath = "github.com/cockroachdb/cockroach/pkg/kv/bulk/bulkpb", visibility = ["//visibility:public"], @@ -38,7 +41,24 @@ go_library( "//pkg/util/humanizeutil", "//pkg/util/log", "@com_github_cockroachdb_redact//:redact", + "@com_github_codahale_hdrhistogram//:hdrhistogram", "@com_github_gogo_protobuf//proto", "@io_opentelemetry_go_otel//attribute", ], ) + +go_test( + name = "bulkpb_test", + srcs = ["ingestion_performance_stats_test.go"], + args = ["-test.timeout=295s"], + embed = [":bulkpb"], + deps = [ + "//pkg/roachpb", + "//pkg/util/bulk", + "//pkg/util/tracing", + "//pkg/util/tracing/tracingpb", + "@com_github_codahale_hdrhistogram//:hdrhistogram", + "@com_github_gogo_protobuf//proto", + "@com_github_stretchr_testify//require", + ], +) diff --git a/pkg/kv/bulk/bulkpb/bulk.go b/pkg/kv/bulk/bulkpb/bulk.go new file mode 100644 index 00000000000..f4e67bcf665 --- /dev/null +++ b/pkg/kv/bulk/bulkpb/bulk.go @@ -0,0 +1,60 @@ +// Copyright 2023 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package bulkpb + +import ( + "fmt" + "strings" + "time" + + "github.com/codahale/hdrhistogram" +) + +type HistogramDataType int + +const ( + HistogramDataTypeLatency HistogramDataType = iota + HistogramDataTypeBytes +) + +const mb = 1 << 20 + +// String implements the stringer interface. +func (h *HistogramData) String() string { + var b strings.Builder + hist := hdrhistogram.Import(&hdrhistogram.Snapshot{ + LowestTrackableValue: h.LowestTrackableValue, + HighestTrackableValue: h.HighestTrackableValue, + SignificantFigures: h.SignificantFigures, + Counts: h.Counts, + }) + + stringify := func(denominator float64) { + b.WriteString(fmt.Sprintf("min: %.6f\n", float64(hist.Min())/denominator)) + b.WriteString(fmt.Sprintf("max: %.6f\n", float64(hist.Max())/denominator)) + b.WriteString(fmt.Sprintf("p5: %.6f\n", float64(hist.ValueAtQuantile(5))/denominator)) + b.WriteString(fmt.Sprintf("p50: %.6f\n", float64(hist.ValueAtQuantile(50))/denominator)) + b.WriteString(fmt.Sprintf("p90: %.6f\n", float64(hist.ValueAtQuantile(90))/denominator)) + b.WriteString(fmt.Sprintf("p99: %.6f\n", float64(hist.ValueAtQuantile(99))/denominator)) + b.WriteString(fmt.Sprintf("p99_9: %.6f\n", float64(hist.ValueAtQuantile(99.9))/denominator)) + b.WriteString(fmt.Sprintf("mean: %.6f\n", float32(hist.Mean())/float32(denominator))) + b.WriteString(fmt.Sprintf("count: %d\n", hist.TotalCount())) + } + + switch h.DataType { + case HistogramDataTypeLatency: + stringify(float64(time.Second)) + case HistogramDataTypeBytes: + stringify(float64(mb)) + } + + return b.String() +} diff --git a/pkg/kv/bulk/bulkpb/bulkpb.proto b/pkg/kv/bulk/bulkpb/bulkpb.proto index f0bf5ba4518..4551f185e2d 100644 --- a/pkg/kv/bulk/bulkpb/bulkpb.proto +++ b/pkg/kv/bulk/bulkpb/bulkpb.proto @@ -16,6 +16,16 @@ option go_package = "github.com/cockroachdb/cockroach/pkg/kv/bulk/bulkpb"; import "gogoproto/gogo.proto"; import "util/hlc/timestamp.proto"; +message HistogramData { + option (gogoproto.goproto_stringer) = false; + + int32 data_type = 5 [(gogoproto.casttype) = "HistogramDataType"]; + int64 lowest_trackable_value = 1; + int64 highest_trackable_value = 2; + int64 significant_figures = 3; + repeated int64 counts = 4; +} + // IngestionPerformanceStats is a message containing information about the // creation of SSTables by an SSTBatcher or BufferingAdder. message IngestionPerformanceStats { @@ -67,11 +77,15 @@ message IngestionPerformanceStats { // BatchWait is the time spent flushing batches (inc split/scatter/send). int64 batch_wait = 14 [(gogoproto.casttype) = "time.Duration"]; + // BatchWaitHist is a histogram of the time spent flushing batches. + HistogramData batch_wait_hist = 24; + // SendWait is the time spent sending batches (addsstable+retries) int64 send_wait = 15 [(gogoproto.casttype) = "time.Duration"]; // SplitWait is the time spent splitting. int64 split_wait = 16 [(gogoproto.casttype) = "time.Duration"]; + // ScatterWait is the time spent scattering. int64 scatter_wait = 17 [(gogoproto.casttype) = "time.Duration"]; @@ -91,4 +105,12 @@ message IngestionPerformanceStats { // SendWaitByStore is the time spent sending batches to each store. map send_wait_by_store = 20 [(gogoproto.castkey) = "github.com/cockroachdb/cockroach/pkg/roachpb.StoreID", (gogoproto.castvalue) = "time.Duration"]; + + // AsWrites is the number of AddSSTable requests that are ingested as a write + // batch. + int64 as_writes = 25; + + // SSTSizeHist is a histogram of the sizes of the SSTs sent to KV for + // ingestion via AddSSSTable requests. + HistogramData sst_size_hist = 26; } diff --git a/pkg/kv/bulk/bulkpb/ingestion_performance_stats.go b/pkg/kv/bulk/bulkpb/ingestion_performance_stats.go index 658fb48ab48..76af0de6638 100644 --- a/pkg/kv/bulk/bulkpb/ingestion_performance_stats.go +++ b/pkg/kv/bulk/bulkpb/ingestion_performance_stats.go @@ -24,12 +24,22 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/humanizeutil" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/redact" + "github.com/codahale/hdrhistogram" "github.com/gogo/protobuf/proto" "go.opentelemetry.io/otel/attribute" ) var _ bulk.TracingAggregatorEvent = (*IngestionPerformanceStats)(nil) +const ( + sigFigs = 1 + minLatency = time.Millisecond + maxLatency = 100 * time.Second + + minBytes = 1024 // 1 KB + maxBytes = 256 * 1024 * 1024 // 256 MB +) + // Identity implements the TracingAggregatorEvent interface. func (s *IngestionPerformanceStats) Identity() bulk.TracingAggregatorEvent { stats := IngestionPerformanceStats{ @@ -40,6 +50,38 @@ func (s *IngestionPerformanceStats) Identity() bulk.TracingAggregatorEvent { return &stats } +// getCombinedHist returns a new HistogramData that contains the currentHist +// combined with the recordValue. If currentHist is nil, a new histogram is +// initialized. +func getCombinedHist( + currentHist *HistogramData, recordValue int64, dataType HistogramDataType, +) *HistogramData { + var hist *hdrhistogram.Histogram + if currentHist != nil { + hist = hdrhistogram.Import(&hdrhistogram.Snapshot{ + LowestTrackableValue: currentHist.LowestTrackableValue, + HighestTrackableValue: currentHist.HighestTrackableValue, + SignificantFigures: currentHist.SignificantFigures, + Counts: currentHist.Counts, + }) + } else if dataType == HistogramDataTypeLatency { + hist = hdrhistogram.New(minLatency.Nanoseconds(), + maxLatency.Nanoseconds(), sigFigs) + } else if dataType == HistogramDataTypeBytes { + hist = hdrhistogram.New(minBytes, maxBytes, sigFigs) + } + _ = hist.RecordValue(recordValue) + // Return the snapshot of this new merged histogram. + cumulativeSnapshot := hist.Export() + return &HistogramData{ + DataType: dataType, + LowestTrackableValue: cumulativeSnapshot.LowestTrackableValue, + HighestTrackableValue: cumulativeSnapshot.HighestTrackableValue, + SignificantFigures: cumulativeSnapshot.SignificantFigures, + Counts: cumulativeSnapshot.Counts, + } +} + // Combine implements the TracingAggregatorEvent interface. func (s *IngestionPerformanceStats) Combine(other bulk.TracingAggregatorEvent) { otherStats, ok := other.(*IngestionPerformanceStats) @@ -66,6 +108,12 @@ func (s *IngestionPerformanceStats) Combine(other bulk.TracingAggregatorEvent) { s.SplitWait += otherStats.SplitWait s.ScatterWait += otherStats.ScatterWait s.CommitWait += otherStats.CommitWait + s.AsWrites += otherStats.AsWrites + + s.BatchWaitHist = getCombinedHist(s.BatchWaitHist, + otherStats.BatchWait.Nanoseconds(), HistogramDataTypeLatency) + s.SstSizeHist = getCombinedHist(s.SstSizeHist, + otherStats.SSTDataSize, HistogramDataTypeBytes) // Duration should not be used in throughput calculations as adding durations // of multiple flushes does not account for concurrent execution of these @@ -126,6 +174,7 @@ func (s *IngestionPerformanceStats) String() string { if s.SSTDataSize > 0 { sstDataSizeMB := float64(s.SSTDataSize) / mb b.WriteString(fmt.Sprintf("sst_data_size: %.2f MB\n", sstDataSizeMB)) + b.WriteString(fmt.Sprintf("sst_data_hist:\n%s\n", s.SstSizeHist.String())) if !s.CurrentFlushTime.IsEmpty() && !s.LastFlushTime.IsEmpty() { duration := s.CurrentFlushTime.GoTime().Sub(s.LastFlushTime.GoTime()) @@ -138,6 +187,7 @@ func (s *IngestionPerformanceStats) String() string { timeString(&b, "sort_wait", s.SortWait) timeString(&b, "flush_wait", s.FlushWait) timeString(&b, "batch_wait", s.BatchWait) + b.WriteString(fmt.Sprintf("batch_wait_hist:\n%s\n", s.BatchWaitHist.String())) timeString(&b, "send_wait", s.SendWait) timeString(&b, "split_wait", s.SplitWait) timeString(&b, "scatter_wait", s.ScatterWait) @@ -146,6 +196,7 @@ func (s *IngestionPerformanceStats) String() string { b.WriteString(fmt.Sprintf("splits: %d\n", s.Splits)) b.WriteString(fmt.Sprintf("scatters: %d\n", s.Scatters)) b.WriteString(fmt.Sprintf("scatter_moved: %d\n", s.ScatterMoved)) + b.WriteString(fmt.Sprintf("as_writes: %d\n", s.AsWrites)) // Sort store send wait by IDs before adding them as tags. ids := make(roachpb.StoreIDSlice, 0, len(s.SendWaitByStore)) diff --git a/pkg/kv/bulk/bulkpb/ingestion_performance_stats_test.go b/pkg/kv/bulk/bulkpb/ingestion_performance_stats_test.go new file mode 100644 index 00000000000..a95c5bfa0d6 --- /dev/null +++ b/pkg/kv/bulk/bulkpb/ingestion_performance_stats_test.go @@ -0,0 +1,158 @@ +// Copyright 2023 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package bulkpb + +import ( + "context" + "testing" + "time" + + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/util/bulk" + "github.com/cockroachdb/cockroach/pkg/util/tracing" + "github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb" + "github.com/codahale/hdrhistogram" + proto "github.com/gogo/protobuf/proto" + "github.com/stretchr/testify/require" +) + +func TestIngestionPerformanceStatsAggregation(t *testing.T) { + tr := tracing.NewTracer() + ctx := context.Background() + + makeEvent := func(v int64, sendWaitByStore map[roachpb.StoreID]time.Duration) *IngestionPerformanceStats { + return &IngestionPerformanceStats{ + LogicalDataSize: v, + SSTDataSize: v, + BufferFlushes: v, + FlushesDueToSize: v, + Batches: v, + BatchesDueToRange: v, + BatchesDueToSize: v, + SplitRetries: v, + Splits: v, + Scatters: v, + ScatterMoved: v, + AsWrites: v, + FillWait: time.Duration(v), + SortWait: time.Duration(v), + FlushWait: time.Duration(v), + BatchWait: time.Duration(v), + SendWait: time.Duration(v), + SplitWait: time.Duration(v), + ScatterWait: time.Duration(v), + CommitWait: time.Duration(v), + Duration: time.Duration(v), + SendWaitByStore: sendWaitByStore, + BatchWaitHist: nil, + SstSizeHist: nil, + } + } + + assertAggContainsStats := func(t *testing.T, agg *bulk.TracingAggregator, expected *IngestionPerformanceStats) { + agg.ForEachAggregatedEvent(func(name string, event bulk.TracingAggregatorEvent) { + require.Equal(t, name, proto.MessageName(expected)) + var actual *IngestionPerformanceStats + var ok bool + if actual, ok = event.(*IngestionPerformanceStats); !ok { + t.Fatal("failed to cast event to expected type") + } + require.Equal(t, expected, actual) + }) + } + + // First, start a root tracing span with a tracing aggregator. + ctx, root := tr.StartSpanCtx(ctx, "root", tracing.WithRecording(tracingpb.RecordingVerbose)) + defer root.Finish() + agg := bulk.TracingAggregatorForContext(ctx) + ctx, withListener := tr.StartSpanCtx(ctx, "withListener", + tracing.WithEventListeners(agg), tracing.WithParent(root)) + defer withListener.Finish() + + // Second, start a child span on the root that also has its own tracing + // aggregator. + child1Agg := bulk.TracingAggregatorForContext(ctx) + child1Ctx, child1AggSp := tr.StartSpanCtx(ctx, "child1", + tracing.WithEventListeners(child1Agg), tracing.WithParent(withListener)) + defer child1AggSp.Finish() + + // In addition, start a child span on the first child span. + _, child1Child := tracing.ChildSpan(child1Ctx, "child1Child") + defer child1Child.Finish() + + // Finally, start a second child span on the root. + _, child2 := tracing.ChildSpan(ctx, "child2") + defer child2.Finish() + + // Record a structured event on all child spans. + child1AggSp.RecordStructured(makeEvent(1, map[roachpb.StoreID]time.Duration{1: 101})) + child1Child.RecordStructured(makeEvent(2, map[roachpb.StoreID]time.Duration{1: 102, 2: 202})) + child2.RecordStructured(makeEvent(3, map[roachpb.StoreID]time.Duration{2: 203, 3: 303})) + + // Verify that the root and child1 aggregators has the expected aggregated + // stats. + t.Run("child1Agg", func(t *testing.T) { + child1AggLatencyHist := hdrhistogram.New(minLatency.Nanoseconds(), maxLatency.Nanoseconds(), sigFigs) + require.NoError(t, child1AggLatencyHist.RecordValue(1)) + require.NoError(t, child1AggLatencyHist.RecordValue(2)) + child1AggLatencyHistSnapshot := child1AggLatencyHist.Export() + child1AggExpectedEvent := makeEvent(3, map[roachpb.StoreID]time.Duration{1: 203, 2: 202}) + child1AggExpectedEvent.BatchWaitHist = &HistogramData{ + DataType: HistogramDataTypeLatency, + LowestTrackableValue: child1AggLatencyHistSnapshot.LowestTrackableValue, + HighestTrackableValue: child1AggLatencyHistSnapshot.HighestTrackableValue, + SignificantFigures: child1AggLatencyHistSnapshot.SignificantFigures, + Counts: child1AggLatencyHistSnapshot.Counts, + } + child1AggDataHist := hdrhistogram.New(minBytes, maxBytes, sigFigs) + require.NoError(t, child1AggDataHist.RecordValue(1)) + require.NoError(t, child1AggDataHist.RecordValue(2)) + child1AggDataHistSnapshot := child1AggDataHist.Export() + child1AggExpectedEvent.SstSizeHist = &HistogramData{ + DataType: HistogramDataTypeBytes, + LowestTrackableValue: child1AggDataHistSnapshot.LowestTrackableValue, + HighestTrackableValue: child1AggDataHistSnapshot.HighestTrackableValue, + SignificantFigures: child1AggDataHistSnapshot.SignificantFigures, + Counts: child1AggDataHistSnapshot.Counts, + } + assertAggContainsStats(t, child1Agg, child1AggExpectedEvent) + }) + + t.Run("root", func(t *testing.T) { + aggLatencyHist := hdrhistogram.New(minLatency.Nanoseconds(), maxLatency.Nanoseconds(), sigFigs) + require.NoError(t, aggLatencyHist.RecordValue(1)) + require.NoError(t, aggLatencyHist.RecordValue(2)) + require.NoError(t, aggLatencyHist.RecordValue(3)) + aggLatencyHistSnapshot := aggLatencyHist.Export() + + aggExpectedEvent := makeEvent(6, map[roachpb.StoreID]time.Duration{1: 203, 2: 405, 3: 303}) + aggExpectedEvent.BatchWaitHist = &HistogramData{ + DataType: HistogramDataTypeLatency, + LowestTrackableValue: aggLatencyHistSnapshot.LowestTrackableValue, + HighestTrackableValue: aggLatencyHistSnapshot.HighestTrackableValue, + SignificantFigures: aggLatencyHistSnapshot.SignificantFigures, + Counts: aggLatencyHistSnapshot.Counts, + } + aggDataHist := hdrhistogram.New(minBytes, maxBytes, sigFigs) + require.NoError(t, aggDataHist.RecordValue(1)) + require.NoError(t, aggDataHist.RecordValue(2)) + require.NoError(t, aggDataHist.RecordValue(3)) + aggDataHistSnapshot := aggDataHist.Export() + aggExpectedEvent.SstSizeHist = &HistogramData{ + DataType: HistogramDataTypeBytes, + LowestTrackableValue: aggDataHistSnapshot.LowestTrackableValue, + HighestTrackableValue: aggDataHistSnapshot.HighestTrackableValue, + SignificantFigures: aggDataHistSnapshot.SignificantFigures, + Counts: aggDataHistSnapshot.Counts, + } + assertAggContainsStats(t, agg, aggExpectedEvent) + }) +} diff --git a/pkg/kv/bulk/sst_batcher.go b/pkg/kv/bulk/sst_batcher.go index fc4eba74709..33a34d79354 100644 --- a/pkg/kv/bulk/sst_batcher.go +++ b/pkg/kv/bulk/sst_batcher.go @@ -797,6 +797,7 @@ func (b *SSTBatcher) addSSTable( if b.settings != nil && int64(len(item.sstBytes)) < tooSmallSSTSize.Get(&b.settings.SV) { log.VEventf(ctx, 3, "ingest data is too small (%d keys/%d bytes) for SSTable, adding via regular batch", item.stats.KeyCount, len(item.sstBytes)) ingestAsWriteBatch = true + ingestionPerformanceStats.AsWrites++ } req := &kvpb.AddSSTableRequest{ diff --git a/pkg/util/bulk/BUILD.bazel b/pkg/util/bulk/BUILD.bazel index e0635dd5fb0..1024fcda1e9 100644 --- a/pkg/util/bulk/BUILD.bazel +++ b/pkg/util/bulk/BUILD.bazel @@ -35,8 +35,6 @@ go_test( deps = [ ":bulk", "//pkg/ccl/backupccl/backuppb", - "//pkg/kv/bulk/bulkpb", - "//pkg/roachpb", "//pkg/util/tracing", "//pkg/util/tracing/tracingpb", "@com_github_gogo_protobuf//proto", diff --git a/pkg/util/bulk/tracing_aggregator_test.go b/pkg/util/bulk/tracing_aggregator_test.go index 71b3535a069..ab15dc09451 100644 --- a/pkg/util/bulk/tracing_aggregator_test.go +++ b/pkg/util/bulk/tracing_aggregator_test.go @@ -16,8 +16,6 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/ccl/backupccl/backuppb" - "github.com/cockroachdb/cockroach/pkg/kv/bulk/bulkpb" - "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/util/bulk" "github.com/cockroachdb/cockroach/pkg/util/tracing" "github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb" @@ -95,79 +93,3 @@ func TestAggregator(t *testing.T) { }, *es) }) } - -func TestIngestionPerformanceStatsAggregation(t *testing.T) { - tr := tracing.NewTracer() - ctx := context.Background() - - makeEvent := func(v int64, sendWaitByStore map[roachpb.StoreID]time.Duration) *bulkpb.IngestionPerformanceStats { - return &bulkpb.IngestionPerformanceStats{ - LogicalDataSize: v, - SSTDataSize: v, - BufferFlushes: v, - FlushesDueToSize: v, - Batches: v, - BatchesDueToRange: v, - BatchesDueToSize: v, - SplitRetries: v, - Splits: v, - Scatters: v, - ScatterMoved: v, - FillWait: time.Duration(v), - SortWait: time.Duration(v), - FlushWait: time.Duration(v), - BatchWait: time.Duration(v), - SendWait: time.Duration(v), - SplitWait: time.Duration(v), - ScatterWait: time.Duration(v), - CommitWait: time.Duration(v), - Duration: time.Duration(v), - SendWaitByStore: sendWaitByStore, - } - } - - assertAggContainsStats := func(t *testing.T, agg *bulk.TracingAggregator, expected *bulkpb.IngestionPerformanceStats) { - agg.ForEachAggregatedEvent(func(name string, event bulk.TracingAggregatorEvent) { - require.Equal(t, name, proto.MessageName(expected)) - var actual *bulkpb.IngestionPerformanceStats - var ok bool - if actual, ok = event.(*bulkpb.IngestionPerformanceStats); !ok { - t.Fatal("failed to cast event to expected type") - } - require.Equal(t, expected, actual) - }) - } - - // First, start a root tracing span with a tracing aggregator. - ctx, root := tr.StartSpanCtx(ctx, "root", tracing.WithRecording(tracingpb.RecordingVerbose)) - defer root.Finish() - agg := bulk.TracingAggregatorForContext(ctx) - ctx, withListener := tr.StartSpanCtx(ctx, "withListener", - tracing.WithEventListeners(agg), tracing.WithParent(root)) - defer withListener.Finish() - - // Second, start a child span on the root that also has its own tracing - // aggregator. - child1Agg := bulk.TracingAggregatorForContext(ctx) - child1Ctx, child1AggSp := tr.StartSpanCtx(ctx, "child1", - tracing.WithEventListeners(child1Agg), tracing.WithParent(withListener)) - defer child1AggSp.Finish() - - // In addition, start a child span on the first child span. - _, child1Child := tracing.ChildSpan(child1Ctx, "child1Child") - defer child1Child.Finish() - - // Finally, start a second child span on the root. - _, child2 := tracing.ChildSpan(ctx, "child2") - defer child2.Finish() - - // Record a structured event on all child spans. - child1AggSp.RecordStructured(makeEvent(1, map[roachpb.StoreID]time.Duration{1: 101})) - child1Child.RecordStructured(makeEvent(2, map[roachpb.StoreID]time.Duration{1: 102, 2: 202})) - child2.RecordStructured(makeEvent(3, map[roachpb.StoreID]time.Duration{2: 203, 3: 303})) - - // Verify that the root and child1 aggregators has the expected aggregated - // stats. - assertAggContainsStats(t, child1Agg, makeEvent(3, map[roachpb.StoreID]time.Duration{1: 203, 2: 202})) - assertAggContainsStats(t, agg, makeEvent(6, map[roachpb.StoreID]time.Duration{1: 203, 2: 405, 3: 303})) -}