Skip to content

Commit

Permalink
Added new builtin generator function crdb_internal.pebble_metrics()
Browse files Browse the repository at this point in the history
This new builtin is used to gather specific pebble metrics for a node
and store id (within an given keyspan). The builtin returns information
about the different types of keys (including snapshot pinned keys) as
well as bytes.

Informs: #94659
Release-note: None
  • Loading branch information
craig[bot] authored and raggar committed Jul 28, 2023
1 parent c986d84 commit 9f47020
Show file tree
Hide file tree
Showing 14 changed files with 222 additions and 0 deletions.
2 changes: 2 additions & 0 deletions docs/generated/sql/functions.md
Original file line number Diff line number Diff line change
Expand Up @@ -3273,6 +3273,8 @@ active for the current transaction.</p>
</span></td><td>Volatile</td></tr>
<tr><td><a name="crdb_internal.payloads_for_trace"></a><code>crdb_internal.payloads_for_trace(trace_id: <a href="int.html">int</a>) &rarr; tuple{int AS span_id, string AS payload_type, jsonb AS payload_jsonb}</code></td><td><span class="funcdesc"><p>Returns the payload(s) of the requested trace.</p>
</span></td><td>Volatile</td></tr>
<tr><td><a name="crdb_internal.pebble_metrics"></a><code>crdb_internal.pebble_metrics(node_id: <a href="int.html">int</a>, store_id: <a href="int.html">int</a>, start_key: <a href="bytes.html">bytes</a>, end_key: <a href="bytes.html">bytes</a>) &rarr; tuple{int AS node_id, int AS store_id, int AS level, int AS snapshot_pinned_keys, int AS snapshot_pinned_keys_<a href="bytes.html">bytes</a>, int AS point_key_delete_count, int AS point_key_delete_<a href="bytes.html">bytes</a>, int AS point_key_set_count, int AS point_key_set_<a href="bytes.html">bytes</a>, int AS range_delete_count, int AS range_delete_<a href="bytes.html">bytes</a>, int AS range_key_set_count, int AS range_key_set_<a href="bytes.html">bytes</a>, int AS range_key_delete_count, int AS range_key_delete_bytes}</code></td><td><span class="funcdesc"><p>Returns statistics for a pebble store in the range start_key and end_key for the provided node id.</p>
</span></td><td>Stable</td></tr>
<tr><td><a name="crdb_internal.pretty_key"></a><code>crdb_internal.pretty_key(raw_key: <a href="bytes.html">bytes</a>, skip_fields: <a href="int.html">int</a>) &rarr; <a href="string.html">string</a></code></td><td><span class="funcdesc"><p>This function is used only by CockroachDB’s developers for testing purposes.</p>
</span></td><td>Immutable</td></tr>
<tr><td><a name="crdb_internal.pretty_span"></a><code>crdb_internal.pretty_span(raw_key_start: <a href="bytes.html">bytes</a>, raw_key_end: <a href="bytes.html">bytes</a>, skip_fields: <a href="int.html">int</a>) &rarr; <a href="string.html">string</a></code></td><td><span class="funcdesc"><p>This function is used only by CockroachDB’s developers for testing purposes.</p>
Expand Down
9 changes: 9 additions & 0 deletions pkg/kv/kvserver/api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,15 @@ message GetTableMetricsResponse{
repeated storage.enginepb.SSTableMetricsInfo table_metrics = 1 [(gogoproto.nullable) = false, (gogoproto.customname) = "TableMetrics"];
}

message GetAdvancedPebbleMetricsRequest {
StoreRequestHeader header = 1 [(gogoproto.nullable) = false, (gogoproto.embed) = true];
roachpb.Span span = 2 [(gogoproto.nullable) = false];
}

message GetAdvancedPebbleMetricsResponse {
repeated storage.enginepb.AdvancedPebbleMetrics advanced_metrics = 1 [(gogoproto.nullable) = false, (gogoproto.customname) = "AdvancedPebbleMetrics"];
}

message CompactEngineSpanResponse {
}

Expand Down
26 changes: 26 additions & 0 deletions pkg/kv/kvserver/storage_engine_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,32 @@ func (c *StorageEngineClient) GetTableMetrics(
return resp.TableMetrics, nil
}

// GetAdvancedPebbleMetrics is a tree.GetAdvancedPebbleMetricsFunc
func (c *StorageEngineClient) GetAdvancedPebbleMetrics(
ctx context.Context, nodeID, storeID int32, startKey, endKey []byte,
) ([]enginepb.AdvancedPebbleMetrics, error) {
conn, err := c.nd.Dial(ctx, roachpb.NodeID(nodeID), rpc.DefaultClass)
if err != nil {
return []enginepb.AdvancedPebbleMetrics{}, errors.Wrapf(err, "could not dial node ID %d", nodeID)
}

client := NewPerStoreClient(conn)
req := &GetAdvancedPebbleMetricsRequest{
StoreRequestHeader: StoreRequestHeader{
NodeID: roachpb.NodeID(nodeID),
StoreID: roachpb.StoreID(storeID),
},
Span: roachpb.Span{Key: roachpb.Key(startKey), EndKey: roachpb.Key(endKey)},
}

resp, err := client.GetAdvancedPebbleMetrics(ctx, req)

if err != nil {
return []enginepb.AdvancedPebbleMetrics{}, err
}
return resp.AdvancedPebbleMetrics, nil
}

// SetCompactionConcurrency is a tree.CompactionConcurrencyFunc.
func (c *StorageEngineClient) SetCompactionConcurrency(
ctx context.Context, nodeID, storeID int32, compactionConcurrency uint64,
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/storage_services.proto
Original file line number Diff line number Diff line change
Expand Up @@ -44,5 +44,6 @@ service PerReplica {
service PerStore {
rpc CompactEngineSpan(cockroach.kv.kvserver.CompactEngineSpanRequest) returns (cockroach.kv.kvserver.CompactEngineSpanResponse) {}
rpc GetTableMetrics(cockroach.kv.kvserver.GetTableMetricsRequest) returns (cockroach.kv.kvserver.GetTableMetricsResponse) {}
rpc GetAdvancedPebbleMetrics(cockroach.kv.kvserver.GetAdvancedPebbleMetricsRequest) returns (cockroach.kv.kvserver.GetAdvancedPebbleMetricsResponse) {}
rpc SetCompactionConcurrency(cockroach.kv.kvserver.CompactionConcurrencyRequest) returns (cockroach.kv.kvserver.CompactionConcurrencyResponse) {}
}
18 changes: 18 additions & 0 deletions pkg/kv/kvserver/stores_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,24 @@ func (is Server) GetTableMetrics(
return resp, err
}

func (is Server) GetAdvancedPebbleMetrics(
ctx context.Context, req *GetAdvancedPebbleMetricsRequest,
) (*GetAdvancedPebbleMetricsResponse, error) {
resp := &GetAdvancedPebbleMetricsResponse{}
err := is.execStoreCommand(ctx, req.StoreRequestHeader,
func(ctx context.Context, s *Store) error {
metrics, err := s.TODOEngine().GetAdvancedPebbleMetrics(req.Span.Key, req.Span.EndKey)

if err != nil {
return err
}

resp.AdvancedPebbleMetrics = metrics
return nil
})
return resp, err
}

// SetCompactionConcurrency implements PerStoreServer. It changes the compaction
// concurrency of a store. While SetCompactionConcurrency is safe for concurrent
// use, it adds uncertainty about the compaction concurrency actually set on
Expand Down
4 changes: 4 additions & 0 deletions pkg/sql/exec_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -1377,6 +1377,10 @@ type ExecutorConfig struct {
// overlap with a key range for a specified node and store.
GetTableMetricsFunc eval.GetTableMetricsFunc

// GetAdvancedPebbleMetrics is used to gather information about the types of
// keys (including snapshot pinned keys) at each level of a node store.
GetAdvancedPebbleMetricsFunc eval.GetAdvancedPebbleMetricsFunc

// TraceCollector is used to contact all live nodes in the cluster, and
// collect trace spans from their inflight node registries.
TraceCollector *collector.TraceCollector
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ func (evalCtx *extendedEvalContext) copyFromExecCfg(execCfg *ExecutorConfig) {
evalCtx.CompactEngineSpan = execCfg.CompactEngineSpanFunc
evalCtx.SetCompactionConcurrency = execCfg.CompactionConcurrencyFunc
evalCtx.GetTableMetrics = execCfg.GetTableMetricsFunc
evalCtx.GetAdvancedPebbleMetrics = execCfg.GetAdvancedPebbleMetricsFunc
evalCtx.TestingKnobs = execCfg.EvalContextTestingKnobs
evalCtx.ClusterID = execCfg.NodeInfo.LogicalClusterID()
evalCtx.ClusterName = execCfg.RPCContext.ClusterName()
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/sem/builtins/fixed_oids.go
Original file line number Diff line number Diff line change
Expand Up @@ -2436,6 +2436,7 @@ var builtinOidsArray = []string{
2463: `workload_index_recs(timestamptz: timestamptz) -> string`,
2464: `workload_index_recs(budget: string) -> string`,
2465: `workload_index_recs(timestamptz: timestamptz, budget: string) -> string`,
2466: `crdb_internal.pebble_metrics(node_id: int, store_id: int, start_key: bytes, end_key: bytes) -> tuple{int AS node_id, int AS store_id, int AS level, int AS snapshot_pinned_keys, int AS snapshot_pinned_keys_bytes, int AS point_key_delete_count, int AS point_key_delete_bytes, int AS point_key_set_count, int AS point_key_set_bytes, int AS range_delete_count, int AS range_delete_bytes, int AS range_key_set_count, int AS range_key_set_bytes, int AS range_key_delete_count, int AS range_key_delete_bytes}`,
}

var builtinOidsBySignature map[string]oid.Oid
Expand Down
123 changes: 123 additions & 0 deletions pkg/sql/sem/builtins/generator_builtins.go
Original file line number Diff line number Diff line change
Expand Up @@ -684,6 +684,23 @@ The last argument is a JSONB object containing the following optional fields:
volatility.Stable,
),
),
"crdb_internal.pebble_metrics": makeBuiltin(
tree.FunctionProperties{
Category: builtinconstants.CategorySystemInfo,
},
makeGeneratorOverload(
tree.ParamTypes{
{Name: "node_id", Typ: types.Int},
{Name: "store_id", Typ: types.Int},
{Name: "start_key", Typ: types.Bytes},
{Name: "end_key", Typ: types.Bytes},
},
advancedPebbleMetricsGeneratorType,
makeAdvancedPebbleMetricsGenerator,
"Returns statistics for a pebble store in the range start_key and end_key for the provided node id.",
volatility.Stable,
),
),
}

var decodePlanGistGeneratorType = types.String
Expand Down Expand Up @@ -3272,6 +3289,112 @@ func makeTableMetricsGenerator(
return newTableMetricsIterator(evalCtx, nodeID, storeID, start, end), nil
}

type advancedPebbleMetricsIterator struct {
metrics []enginepb.AdvancedPebbleMetrics
evalCtx *eval.Context

iterIdx int
nodeID int32
storeID int32
start []byte
end []byte
}

var advancedPebbleMetricsGeneratorType = types.MakeLabeledTuple(
[]*types.T{types.Int, types.Int, types.Int, types.Int, types.Int, types.Int, types.Int, types.Int,
types.Int, types.Int, types.Int, types.Int, types.Int, types.Int, types.Int},
[]string{
"node_id",
"store_id",
"level",
"snapshot_pinned_keys",
"snapshot_pinned_keys_bytes",
"point_key_delete_count",
"point_key_delete_bytes",
"point_key_set_count",
"point_key_set_bytes",
"range_delete_count",
"range_delete_bytes",
"range_key_set_count",
"range_key_set_bytes",
"range_key_delete_count",
"range_key_delete_bytes",
},
)

var _ eval.ValueGenerator = (*advancedPebbleMetricsIterator)(nil)

func newAdvancedPebbleMetricsIterator(
evalCtx *eval.Context, nodeID, storeID int32, start, end []byte,
) *advancedPebbleMetricsIterator {
return &advancedPebbleMetricsIterator{evalCtx: evalCtx, nodeID: nodeID, storeID: storeID, start: start, end: end}
}

// Start implements the tree.ValueGenerator interface.
func (apmi *advancedPebbleMetricsIterator) Start(ctx context.Context, _ *kv.Txn) error {
var err error
apmi.metrics, err = apmi.evalCtx.GetAdvancedPebbleMetrics(ctx, apmi.nodeID, apmi.storeID, apmi.start, apmi.end)
if err != nil {
err = errors.Wrapf(err, "getting table metrics for node %d store %d", apmi.nodeID, apmi.storeID)
}

return err
}

// Next implements the tree.ValueGenerator interface.
func (apmi *advancedPebbleMetricsIterator) Next(_ context.Context) (bool, error) {
apmi.iterIdx++
return apmi.iterIdx <= len(apmi.metrics), nil
}

// Values implements the tree.ValueGenerator interface.
func (apmi *advancedPebbleMetricsIterator) Values() (tree.Datums, error) {
metricsInfo := apmi.metrics[apmi.iterIdx-1]

return tree.Datums{
tree.NewDInt(tree.DInt(apmi.nodeID)),
tree.NewDInt(tree.DInt(apmi.storeID)),
tree.NewDInt(tree.DInt(metricsInfo.SnapshotPinnedKeys)),
tree.NewDInt(tree.DInt(metricsInfo.SnapshotPinnedKeysBytes)),
tree.NewDInt(tree.DInt(metricsInfo.PointKeyDeleteCount)),
tree.NewDInt(tree.DInt(metricsInfo.PointKeyDeleteBytes)),
tree.NewDInt(tree.DInt(metricsInfo.PointKeySetCount)),
tree.NewDInt(tree.DInt(metricsInfo.PointKeySetBytes)),
tree.NewDInt(tree.DInt(metricsInfo.RangeDeleteCount)),
tree.NewDInt(tree.DInt(metricsInfo.RangeDeleteBytes)),
tree.NewDInt(tree.DInt(metricsInfo.RangeKeySetCount)),
tree.NewDInt(tree.DInt(metricsInfo.RangeKeySetBytes)),
tree.NewDInt(tree.DInt(metricsInfo.RangeKeyDeleteCount)),
tree.NewDInt(tree.DInt(metricsInfo.RangeKeyDeleteBytes)),
}, nil
}

// Close implements the tree.ValueGenerator interface.
func (tmi *advancedPebbleMetricsIterator) Close(_ context.Context) {}

// ResolvedType implements the tree.ValueGenerator interface.
func (tmi *advancedPebbleMetricsIterator) ResolvedType() *types.T {
return tableMetricsGeneratorType
}

func makeAdvancedPebbleMetricsGenerator(
ctx context.Context, evalCtx *eval.Context, args tree.Datums,
) (eval.ValueGenerator, error) {
isAdmin, err := evalCtx.SessionAccessor.HasAdminRole(ctx)
if err != nil {
return nil, err
}
if !isAdmin {
return nil, errInsufficientPriv
}
nodeID := int32(tree.MustBeDInt(args[0]))
storeID := int32(tree.MustBeDInt(args[1]))
start := []byte(tree.MustBeDBytes(args[2]))
end := []byte(tree.MustBeDBytes(args[3]))

return newAdvancedPebbleMetricsIterator(evalCtx, nodeID, storeID, start, end), nil
}

var tableSpanStatsGeneratorType = types.MakeLabeledTuple(
[]*types.T{types.Int, types.Int, types.Int, types.Int, types.Int, types.Int, types.Float},
[]string{"database_id", "table_id", "range_count", "approximate_disk_bytes", "live_bytes", "total_bytes", "live_percentage"},
Expand Down
3 changes: 3 additions & 0 deletions pkg/sql/sem/eval/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,9 @@ type Context struct {
// GetTableMetrics is used in crdb_internal.sstable_metrics.
GetTableMetrics GetTableMetricsFunc

// GetAdvancedPebbleMetrics is used in crdb_internal.pebble_metrics.
GetAdvancedPebbleMetrics GetAdvancedPebbleMetricsFunc

// SetCompactionConcurrency is used to change the compaction concurrency of
// a store.
SetCompactionConcurrency SetCompactionConcurrencyFunc
Expand Down
6 changes: 6 additions & 0 deletions pkg/sql/sem/eval/deps.go
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,12 @@ type GetTableMetricsFunc func(
ctx context.Context, nodeID, storeID int32, startKey, endKey []byte,
) ([]enginepb.SSTableMetricsInfo, error)

// GetAdvancedPebbleMetrics is used to retrieve pebble metrics on a key span
// (end-exclusive) at the given (nodeID, storeID).
type GetAdvancedPebbleMetricsFunc func(
ctx context.Context, nodeID, storeID int32, startKey, endKey []byte,
) ([]enginepb.AdvancedPebbleMetrics, error)

// SetCompactionConcurrencyFunc is used to change the compaction concurrency of a
// store.
type SetCompactionConcurrencyFunc func(
Expand Down
2 changes: 2 additions & 0 deletions pkg/storage/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -996,6 +996,8 @@ type Engine interface {
// CompactRange ensures that the specified range of key value pairs is
// optimized for space efficiency.
CompactRange(start, end roachpb.Key) error
// GetAdvancedPebbleMetrics returns key level statistics for each level of a pebble store (that overlap start and end).
GetAdvancedPebbleMetrics(start, end roachpb.Key) ([]enginepb.AdvancedPebbleMetrics, error)
// GetTableMetrics returns information about sstables that overlap start and end.
GetTableMetrics(start, end roachpb.Key) ([]enginepb.SSTableMetricsInfo, error)
// RegisterFlushCompletedCallback registers a callback that will be run for
Expand Down
17 changes: 17 additions & 0 deletions pkg/storage/enginepb/rocksdb.proto
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,23 @@ message SSTableMetricsInfo {
uint64 approximate_span_bytes = 4 [(gogoproto.customname) = "ApproximateSpanBytes"];
}

message AdvancedPebbleMetrics {
// level is the lsm tree level the metrics are found.
int32 level = 1;
uint64 snapshot_pinned_keys = 2 [(gogoproto.customname) = "SnapshotPinnedKeys"];
uint64 snapshot_pinnedKeys_bytes = 3 [(gogoproto.customname) = "SnapshotPinnedKeysBytes"];
uint64 point_key_delete_count = 4 [(gogoproto.customname) = "PointKeyDeleteCount"];
uint64 point_key_delete_bytes = 5 [(gogoproto.customname) = "PointKeyDeleteBytes"];
uint64 point_key_set_count = 6 [(gogoproto.customname) = "PointKeySetCount"];
uint64 point_key_set_bytes = 7 [(gogoproto.customname) = "PointKeySetBytes"];
uint64 range_delete_count = 8 [(gogoproto.customname) = "RangeDeleteCount"];
uint64 range_delete_bytes = 9 [(gogoproto.customname) = "RangeDeleteBytes"];
uint64 range_key_set_count = 10 [(gogoproto.customname) = "RangeKeySetCount"];
uint64 range_key_set_bytes = 11 [(gogoproto.customname) = "RangeKeySetBytes"];
uint64 range_key_delete_count = 12 [(gogoproto.customname) = "RangeKeyDeleteCount"];
uint64 range_key_delete_bytes = 13 [(gogoproto.customname) = "RangeKeyDeleteBytes"];
}

// SSTUserProperties contains the user-added properties of a single sstable.
message SSTUserProperties {
string path = 1;
Expand Down
9 changes: 9 additions & 0 deletions pkg/storage/pebble.go
Original file line number Diff line number Diff line change
Expand Up @@ -2063,6 +2063,15 @@ func (p *Pebble) GetTableMetrics(start, end roachpb.Key) ([]enginepb.SSTableMetr
return metricsInfo, nil
}

// GetAdvancedPebbleMetrics implements the Engine interface.
func (p *Pebble) GetAdvancedPebbleMetrics(
start, end roachpb.Key,
) ([]enginepb.AdvancedPebbleMetrics, error) {
fmt.Println("START", start)
fmt.Println("END", end)
return []enginepb.AdvancedPebbleMetrics{}, nil
}

// ApproximateDiskBytes implements the Engine interface.
func (p *Pebble) ApproximateDiskBytes(
from, to roachpb.Key,
Expand Down

0 comments on commit 9f47020

Please sign in to comment.