Skip to content

Commit

Permalink
bulk: add histograms to ingestion performance stats
Browse files Browse the repository at this point in the history
This change adds two histograms to the IngestionPerformanceStats.
A histogram that tracks the batch wait, and one that tracks
the SST data size. This change also adds a count for the number
of ssts that are ingested as write batches.

Release note: None
  • Loading branch information
adityamaru committed Sep 26, 2023
1 parent 9cc17d7 commit 34eb57a
Show file tree
Hide file tree
Showing 9 changed files with 316 additions and 82 deletions.
2 changes: 2 additions & 0 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ ALL_TESTS = [
"//pkg/jobs:jobs_test",
"//pkg/keys:keys_test",
"//pkg/keyvisualizer/spanstatsconsumer:spanstatsconsumer_test",
"//pkg/kv/bulk/bulkpb:bulkpb_test",
"//pkg/kv/bulk:bulk_test",
"//pkg/kv/kvclient/kvcoord:kvcoord_disallowed_imports_test",
"//pkg/kv/kvclient/kvcoord:kvcoord_test",
Expand Down Expand Up @@ -1266,6 +1267,7 @@ GO_TARGETS = [
"//pkg/keyvisualizer/spanstatskvaccessor:spanstatskvaccessor",
"//pkg/keyvisualizer:keyvisualizer",
"//pkg/kv/bulk/bulkpb:bulkpb",
"//pkg/kv/bulk/bulkpb:bulkpb_test",
"//pkg/kv/bulk:bulk",
"//pkg/kv/bulk:bulk_test",
"//pkg/kv/kvbase:kvbase",
Expand Down
24 changes: 22 additions & 2 deletions pkg/kv/bulk/bulkpb/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
load("@rules_proto//proto:defs.bzl", "proto_library")
load("@io_bazel_rules_go//go:def.bzl", "go_library")
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
load("@io_bazel_rules_go//proto:def.bzl", "go_proto_library")

proto_library(
Expand Down Expand Up @@ -27,7 +27,10 @@ go_proto_library(

go_library(
name = "bulkpb",
srcs = ["ingestion_performance_stats.go"],
srcs = [
"bulk.go",
"ingestion_performance_stats.go",
],
embed = [":bulkpb_go_proto"],
importpath = "github.com/cockroachdb/cockroach/pkg/kv/bulk/bulkpb",
visibility = ["//visibility:public"],
Expand All @@ -38,7 +41,24 @@ go_library(
"//pkg/util/humanizeutil",
"//pkg/util/log",
"@com_github_cockroachdb_redact//:redact",
"@com_github_codahale_hdrhistogram//:hdrhistogram",
"@com_github_gogo_protobuf//proto",
"@io_opentelemetry_go_otel//attribute",
],
)

go_test(
name = "bulkpb_test",
srcs = ["ingestion_performance_stats_test.go"],
args = ["-test.timeout=295s"],
embed = [":bulkpb"],
deps = [
"//pkg/roachpb",
"//pkg/util/bulk",
"//pkg/util/tracing",
"//pkg/util/tracing/tracingpb",
"@com_github_codahale_hdrhistogram//:hdrhistogram",
"@com_github_gogo_protobuf//proto",
"@com_github_stretchr_testify//require",
],
)
60 changes: 60 additions & 0 deletions pkg/kv/bulk/bulkpb/bulk.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// Copyright 2023 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package bulkpb

import (
"fmt"
"strings"
"time"

"github.com/codahale/hdrhistogram"
)

type HistogramDataType int

const (
HistogramDataTypeLatency HistogramDataType = iota
HistogramDataTypeBytes
)

const mb = 1 << 20

// String implements the stringer interface.
func (h *HistogramData) String() string {
var b strings.Builder
hist := hdrhistogram.Import(&hdrhistogram.Snapshot{
LowestTrackableValue: h.LowestTrackableValue,
HighestTrackableValue: h.HighestTrackableValue,
SignificantFigures: h.SignificantFigures,
Counts: h.Counts,
})

stringify := func(denominator float64) {
b.WriteString(fmt.Sprintf("min: %.6f\n", float64(hist.Min())/denominator))
b.WriteString(fmt.Sprintf("max: %.6f\n", float64(hist.Max())/denominator))
b.WriteString(fmt.Sprintf("p5: %.6f\n", float64(hist.ValueAtQuantile(5))/denominator))
b.WriteString(fmt.Sprintf("p50: %.6f\n", float64(hist.ValueAtQuantile(50))/denominator))
b.WriteString(fmt.Sprintf("p90: %.6f\n", float64(hist.ValueAtQuantile(90))/denominator))
b.WriteString(fmt.Sprintf("p99: %.6f\n", float64(hist.ValueAtQuantile(99))/denominator))
b.WriteString(fmt.Sprintf("p99_9: %.6f\n", float64(hist.ValueAtQuantile(99.9))/denominator))
b.WriteString(fmt.Sprintf("mean: %.6f\n", float32(hist.Mean())/float32(denominator)))
b.WriteString(fmt.Sprintf("count: %d\n", hist.TotalCount()))
}

switch h.DataType {
case HistogramDataTypeLatency:
stringify(float64(time.Second))
case HistogramDataTypeBytes:
stringify(float64(mb))
}

return b.String()
}
22 changes: 22 additions & 0 deletions pkg/kv/bulk/bulkpb/bulkpb.proto
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,16 @@ option go_package = "github.com/cockroachdb/cockroach/pkg/kv/bulk/bulkpb";
import "gogoproto/gogo.proto";
import "util/hlc/timestamp.proto";

message HistogramData {
option (gogoproto.goproto_stringer) = false;

int32 data_type = 5 [(gogoproto.casttype) = "HistogramDataType"];
int64 lowest_trackable_value = 1;
int64 highest_trackable_value = 2;
int64 significant_figures = 3;
repeated int64 counts = 4;
}

// IngestionPerformanceStats is a message containing information about the
// creation of SSTables by an SSTBatcher or BufferingAdder.
message IngestionPerformanceStats {
Expand Down Expand Up @@ -67,11 +77,15 @@ message IngestionPerformanceStats {
// BatchWait is the time spent flushing batches (inc split/scatter/send).
int64 batch_wait = 14 [(gogoproto.casttype) = "time.Duration"];

// BatchWaitHist is a histogram of the time spent flushing batches.
HistogramData batch_wait_hist = 24;

// SendWait is the time spent sending batches (addsstable+retries)
int64 send_wait = 15 [(gogoproto.casttype) = "time.Duration"];

// SplitWait is the time spent splitting.
int64 split_wait = 16 [(gogoproto.casttype) = "time.Duration"];

// ScatterWait is the time spent scattering.
int64 scatter_wait = 17 [(gogoproto.casttype) = "time.Duration"];

Expand All @@ -91,4 +105,12 @@ message IngestionPerformanceStats {

// SendWaitByStore is the time spent sending batches to each store.
map<int32, int64> send_wait_by_store = 20 [(gogoproto.castkey) = "github.com/cockroachdb/cockroach/pkg/roachpb.StoreID", (gogoproto.castvalue) = "time.Duration"];

// AsWrites is the number of AddSSTable requests that are ingested as a write
// batch.
int64 as_writes = 25;

// SSTSizeHist is a histogram of the sizes of the SSTs sent to KV for
// ingestion via AddSSSTable requests.
HistogramData sst_size_hist = 26;
}
51 changes: 51 additions & 0 deletions pkg/kv/bulk/bulkpb/ingestion_performance_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,22 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/humanizeutil"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/redact"
"github.com/codahale/hdrhistogram"
"github.com/gogo/protobuf/proto"
"go.opentelemetry.io/otel/attribute"
)

var _ bulk.TracingAggregatorEvent = (*IngestionPerformanceStats)(nil)

const (
sigFigs = 1
minLatency = time.Millisecond
maxLatency = 100 * time.Second

minBytes = 1024 // 1 KB
maxBytes = 256 * 1024 * 1024 // 256 MB
)

// Identity implements the TracingAggregatorEvent interface.
func (s *IngestionPerformanceStats) Identity() bulk.TracingAggregatorEvent {
stats := IngestionPerformanceStats{
Expand All @@ -40,6 +50,38 @@ func (s *IngestionPerformanceStats) Identity() bulk.TracingAggregatorEvent {
return &stats
}

// getCombinedHist returns a new HistogramData that contains the currentHist
// combined with the recordValue. If currentHist is nil, a new histogram is
// initialized.
func getCombinedHist(
currentHist *HistogramData, recordValue int64, dataType HistogramDataType,
) *HistogramData {
var hist *hdrhistogram.Histogram
if currentHist != nil {
hist = hdrhistogram.Import(&hdrhistogram.Snapshot{
LowestTrackableValue: currentHist.LowestTrackableValue,
HighestTrackableValue: currentHist.HighestTrackableValue,
SignificantFigures: currentHist.SignificantFigures,
Counts: currentHist.Counts,
})
} else if dataType == HistogramDataTypeLatency {
hist = hdrhistogram.New(minLatency.Nanoseconds(),
maxLatency.Nanoseconds(), sigFigs)
} else if dataType == HistogramDataTypeBytes {
hist = hdrhistogram.New(minBytes, maxBytes, sigFigs)
}
_ = hist.RecordValue(recordValue)
// Return the snapshot of this new merged histogram.
cumulativeSnapshot := hist.Export()
return &HistogramData{
DataType: dataType,
LowestTrackableValue: cumulativeSnapshot.LowestTrackableValue,
HighestTrackableValue: cumulativeSnapshot.HighestTrackableValue,
SignificantFigures: cumulativeSnapshot.SignificantFigures,
Counts: cumulativeSnapshot.Counts,
}
}

// Combine implements the TracingAggregatorEvent interface.
func (s *IngestionPerformanceStats) Combine(other bulk.TracingAggregatorEvent) {
otherStats, ok := other.(*IngestionPerformanceStats)
Expand All @@ -66,6 +108,12 @@ func (s *IngestionPerformanceStats) Combine(other bulk.TracingAggregatorEvent) {
s.SplitWait += otherStats.SplitWait
s.ScatterWait += otherStats.ScatterWait
s.CommitWait += otherStats.CommitWait
s.AsWrites += otherStats.AsWrites

s.BatchWaitHist = getCombinedHist(s.BatchWaitHist,
otherStats.BatchWait.Nanoseconds(), HistogramDataTypeLatency)
s.SstSizeHist = getCombinedHist(s.SstSizeHist,
otherStats.SSTDataSize, HistogramDataTypeBytes)

// Duration should not be used in throughput calculations as adding durations
// of multiple flushes does not account for concurrent execution of these
Expand Down Expand Up @@ -126,6 +174,7 @@ func (s *IngestionPerformanceStats) String() string {
if s.SSTDataSize > 0 {
sstDataSizeMB := float64(s.SSTDataSize) / mb
b.WriteString(fmt.Sprintf("sst_data_size: %.2f MB\n", sstDataSizeMB))
b.WriteString(fmt.Sprintf("sst_data_hist:\n%s\n", s.SstSizeHist.String()))

if !s.CurrentFlushTime.IsEmpty() && !s.LastFlushTime.IsEmpty() {
duration := s.CurrentFlushTime.GoTime().Sub(s.LastFlushTime.GoTime())
Expand All @@ -138,6 +187,7 @@ func (s *IngestionPerformanceStats) String() string {
timeString(&b, "sort_wait", s.SortWait)
timeString(&b, "flush_wait", s.FlushWait)
timeString(&b, "batch_wait", s.BatchWait)
b.WriteString(fmt.Sprintf("batch_wait_hist:\n%s\n", s.BatchWaitHist.String()))
timeString(&b, "send_wait", s.SendWait)
timeString(&b, "split_wait", s.SplitWait)
timeString(&b, "scatter_wait", s.ScatterWait)
Expand All @@ -146,6 +196,7 @@ func (s *IngestionPerformanceStats) String() string {
b.WriteString(fmt.Sprintf("splits: %d\n", s.Splits))
b.WriteString(fmt.Sprintf("scatters: %d\n", s.Scatters))
b.WriteString(fmt.Sprintf("scatter_moved: %d\n", s.ScatterMoved))
b.WriteString(fmt.Sprintf("as_writes: %d\n", s.AsWrites))

// Sort store send wait by IDs before adding them as tags.
ids := make(roachpb.StoreIDSlice, 0, len(s.SendWaitByStore))
Expand Down
Loading

0 comments on commit 34eb57a

Please sign in to comment.