Skip to content

Commit

Permalink
sql: Add new generator crdb_internal.scan_storage_internal_keys()
Browse files Browse the repository at this point in the history
This new builtin is used to gather specific pebble metrics for a node
and store id (within an given keyspan). The builtin returns information
about the different types of keys (including snapshot pinned keys) as
well as bytes.

Informs: #94659
Release note: None
  • Loading branch information
craig[bot] authored and raggar committed Aug 10, 2023
1 parent a7f0434 commit a2bf900
Show file tree
Hide file tree
Showing 14 changed files with 276 additions and 10 deletions.
10 changes: 10 additions & 0 deletions pkg/kv/kvserver/api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,16 @@ message GetTableMetricsResponse{
repeated storage.enginepb.SSTableMetricsInfo table_metrics = 1 [(gogoproto.nullable) = false, (gogoproto.customname) = "TableMetrics"];
}

message ScanStorageInternalKeysRequest {
StoreRequestHeader header = 1 [(gogoproto.nullable) = false, (gogoproto.embed) = true];
roachpb.Span span = 2 [(gogoproto.nullable) = false];
int64 MegabytesPerSecond = 3;
}

message ScanStorageInternalKeysResponse {
repeated storage.enginepb.StorageInternalKeysMetrics advanced_metrics = 1 [(gogoproto.nullable) = false, (gogoproto.customname) = "AdvancedPebbleMetrics"];
}

message CompactEngineSpanResponse {
}

Expand Down
27 changes: 27 additions & 0 deletions pkg/kv/kvserver/storage_engine_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,33 @@ func (c *StorageEngineClient) GetTableMetrics(
return resp.TableMetrics, nil
}

// ScanStorageInternalKeys is a tree.ScanStorageInternalKeys
func (c *StorageEngineClient) ScanStorageInternalKeys(
ctx context.Context, nodeID, storeID int32, startKey, endKey []byte, megabytesPerSecond int64,
) ([]enginepb.StorageInternalKeysMetrics, error) {
conn, err := c.nd.Dial(ctx, roachpb.NodeID(nodeID), rpc.DefaultClass)
if err != nil {
return []enginepb.StorageInternalKeysMetrics{}, errors.Wrapf(err, "could not dial node ID %d", nodeID)
}

client := NewPerStoreClient(conn)
req := &ScanStorageInternalKeysRequest{
StoreRequestHeader: StoreRequestHeader{
NodeID: roachpb.NodeID(nodeID),
StoreID: roachpb.StoreID(storeID),
},
Span: roachpb.Span{Key: roachpb.Key(startKey), EndKey: roachpb.Key(endKey)},
MegabytesPerSecond: megabytesPerSecond,
}

resp, err := client.ScanStorageInternalKeys(ctx, req)

if err != nil {
return []enginepb.StorageInternalKeysMetrics{}, err
}
return resp.AdvancedPebbleMetrics, nil
}

// SetCompactionConcurrency is a tree.CompactionConcurrencyFunc.
func (c *StorageEngineClient) SetCompactionConcurrency(
ctx context.Context, nodeID, storeID int32, compactionConcurrency uint64,
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/storage_services.proto
Original file line number Diff line number Diff line change
Expand Up @@ -44,5 +44,6 @@ service PerReplica {
service PerStore {
rpc CompactEngineSpan(cockroach.kv.kvserver.CompactEngineSpanRequest) returns (cockroach.kv.kvserver.CompactEngineSpanResponse) {}
rpc GetTableMetrics(cockroach.kv.kvserver.GetTableMetricsRequest) returns (cockroach.kv.kvserver.GetTableMetricsResponse) {}
rpc ScanStorageInternalKeys(cockroach.kv.kvserver.ScanStorageInternalKeysRequest) returns (cockroach.kv.kvserver.ScanStorageInternalKeysResponse) {}
rpc SetCompactionConcurrency(cockroach.kv.kvserver.CompactionConcurrencyRequest) returns (cockroach.kv.kvserver.CompactionConcurrencyResponse) {}
}
18 changes: 18 additions & 0 deletions pkg/kv/kvserver/stores_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,24 @@ func (is Server) GetTableMetrics(
return resp, err
}

func (is Server) ScanStorageInternalKeys(
ctx context.Context, req *ScanStorageInternalKeysRequest,
) (*ScanStorageInternalKeysResponse, error) {
resp := &ScanStorageInternalKeysResponse{}
err := is.execStoreCommand(ctx, req.StoreRequestHeader,
func(ctx context.Context, s *Store) error {
metrics, err := s.TODOEngine().ScanStorageInternalKeys(req.Span.Key, req.Span.EndKey, req.MegabytesPerSecond)

if err != nil {
return err
}

resp.AdvancedPebbleMetrics = metrics
return nil
})
return resp, err
}

// SetCompactionConcurrency implements PerStoreServer. It changes the compaction
// concurrency of a store. While SetCompactionConcurrency is safe for concurrent
// use, it adds uncertainty about the compaction concurrency actually set on
Expand Down
21 changes: 11 additions & 10 deletions pkg/server/server_sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -961,16 +961,17 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) {
ClientCertExpirationCache: security.NewClientCertExpirationCache(
ctx, cfg.Settings, cfg.stopper, &timeutil.DefaultTimeSource{}, rootSQLMemoryMonitor,
),
RootMemoryMonitor: rootSQLMemoryMonitor,
TestingKnobs: sqlExecutorTestingKnobs,
CompactEngineSpanFunc: storageEngineClient.CompactEngineSpan,
CompactionConcurrencyFunc: storageEngineClient.SetCompactionConcurrency,
GetTableMetricsFunc: storageEngineClient.GetTableMetrics,
TraceCollector: traceCollector,
TenantUsageServer: cfg.tenantUsageServer,
KVStoresIterator: cfg.kvStoresIterator,
InspectzServer: cfg.inspectzServer,
RangeDescIteratorFactory: cfg.rangeDescIteratorFactory,
RootMemoryMonitor: rootSQLMemoryMonitor,
TestingKnobs: sqlExecutorTestingKnobs,
CompactEngineSpanFunc: storageEngineClient.CompactEngineSpan,
CompactionConcurrencyFunc: storageEngineClient.SetCompactionConcurrency,
GetTableMetricsFunc: storageEngineClient.GetTableMetrics,
ScanStorageInternalKeysFunc: storageEngineClient.ScanStorageInternalKeys,
TraceCollector: traceCollector,
TenantUsageServer: cfg.tenantUsageServer,
KVStoresIterator: cfg.kvStoresIterator,
InspectzServer: cfg.inspectzServer,
RangeDescIteratorFactory: cfg.rangeDescIteratorFactory,
SyntheticPrivilegeCache: syntheticprivilegecache.New(
cfg.Settings, cfg.stopper, cfg.db,
serverCacheMemoryMonitor.MakeBoundAccount(),
Expand Down
4 changes: 4 additions & 0 deletions pkg/sql/exec_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -1383,6 +1383,10 @@ type ExecutorConfig struct {
// overlap with a key range for a specified node and store.
GetTableMetricsFunc eval.GetTableMetricsFunc

// ScanStorageInternalKeys is used to gather information about the types of
// keys (including snapshot pinned keys) at each level of a node store.
ScanStorageInternalKeysFunc eval.ScanStorageInternalKeysFunc

// TraceCollector is used to contact all live nodes in the cluster, and
// collect trace spans from their inflight node registries.
TraceCollector *collector.TraceCollector
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ func (evalCtx *extendedEvalContext) copyFromExecCfg(execCfg *ExecutorConfig) {
evalCtx.CompactEngineSpan = execCfg.CompactEngineSpanFunc
evalCtx.SetCompactionConcurrency = execCfg.CompactionConcurrencyFunc
evalCtx.GetTableMetrics = execCfg.GetTableMetricsFunc
evalCtx.ScanStorageInternalKeys = execCfg.ScanStorageInternalKeysFunc
evalCtx.TestingKnobs = execCfg.EvalContextTestingKnobs
evalCtx.ClusterID = execCfg.NodeInfo.LogicalClusterID()
evalCtx.ClusterName = execCfg.RPCContext.ClusterName()
Expand Down
2 changes: 2 additions & 0 deletions pkg/sql/sem/builtins/fixed_oids.go
Original file line number Diff line number Diff line change
Expand Up @@ -2440,6 +2440,8 @@ var builtinOidsArray = []string{
2467: `crdb_internal.request_statement_bundle(stmtFingerprint: string, planGist: string, samplingProbability: float, minExecutionLatency: interval, expiresAfter: interval) -> bool`,
2468: `crdb_internal.request_statement_bundle(stmtFingerprint: string, planGist: string, antiPlanGist: bool, samplingProbability: float, minExecutionLatency: interval, expiresAfter: interval) -> bool`,
2469: `crdb_internal.is_system_table_key(raw_key: bytes) -> bool`,
2470: `crdb_internal.scan_storage_internal_keys(node_id: int, store_id: int, start_key: bytes, end_key: bytes) -> tuple{int AS level, int AS node_id, int AS store_id, int AS snapshot_pinned_keys, int AS snapshot_pinned_keys_bytes, int AS point_key_delete_count, int AS point_key_set_count, int AS range_delete_count, int AS range_key_set_count, int AS range_key_delete_count}`,
2471: `crdb_internal.scan_storage_internal_keys(node_id: int, store_id: int, start_key: bytes, end_key: bytes, mb_per_second: int4) -> tuple{int AS level, int AS node_id, int AS store_id, int AS snapshot_pinned_keys, int AS snapshot_pinned_keys_bytes, int AS point_key_delete_count, int AS point_key_set_count, int AS range_delete_count, int AS range_key_set_count, int AS range_key_delete_count}`,
}

var builtinOidsBySignature map[string]oid.Oid
Expand Down
140 changes: 140 additions & 0 deletions pkg/sql/sem/builtins/generator_builtins.go
Original file line number Diff line number Diff line change
Expand Up @@ -684,6 +684,36 @@ The last argument is a JSONB object containing the following optional fields:
volatility.Stable,
),
),
"crdb_internal.scan_storage_internal_keys": makeBuiltin(
tree.FunctionProperties{
Category: builtinconstants.CategorySystemInfo,
},
makeGeneratorOverload(
tree.ParamTypes{
{Name: "node_id", Typ: types.Int},
{Name: "store_id", Typ: types.Int},
{Name: "start_key", Typ: types.Bytes},
{Name: "end_key", Typ: types.Bytes},
},
storageInternalKeysGeneratorType,
makeStorageInternalKeysGenerator,
"Scans a store's storage engine, computing statistics describing the internal keys within the span [start_key, end_key). This function is rate limited to 10 megabytes per second.",
volatility.Volatile,
),
makeGeneratorOverload(
tree.ParamTypes{
{Name: "node_id", Typ: types.Int},
{Name: "store_id", Typ: types.Int},
{Name: "start_key", Typ: types.Bytes},
{Name: "end_key", Typ: types.Bytes},
{Name: "mb_per_second", Typ: types.Int4},
},
storageInternalKeysGeneratorType,
makeStorageInternalKeysGenerator,
"Scans a store's storage engine, computing statistics describing the internal keys within the span [start_key, end_key).",
volatility.Volatile,
),
),
}

var decodePlanGistGeneratorType = types.String
Expand Down Expand Up @@ -3272,6 +3302,116 @@ func makeTableMetricsGenerator(
return newTableMetricsIterator(evalCtx, nodeID, storeID, start, end), nil
}

type storageInternalKeysIterator struct {
metrics []enginepb.StorageInternalKeysMetrics
evalCtx *eval.Context

iterIdx int
nodeID int32
storeID int32
megabytesPerSecond int64
start []byte
end []byte
}

var storageInternalKeysGeneratorType = types.MakeLabeledTuple(
[]*types.T{types.Int, types.Int, types.Int, types.Int, types.Int, types.Int, types.Int, types.Int,
types.Int, types.Int},
[]string{
"level",
"node_id",
"store_id",
"snapshot_pinned_keys",
"snapshot_pinned_keys_bytes",
"point_key_delete_count",
"point_key_set_count",
"range_delete_count",
"range_key_set_count",
"range_key_delete_count",
},
)

var _ eval.ValueGenerator = (*storageInternalKeysIterator)(nil)

func newStorageInternalKeysGenerator(
evalCtx *eval.Context, nodeID, storeID int32, start, end []byte, megaBytesPerSecond int64,
) *storageInternalKeysIterator {
return &storageInternalKeysIterator{evalCtx: evalCtx, nodeID: nodeID, storeID: storeID, start: start, end: end, megabytesPerSecond: megaBytesPerSecond}
}

// Start implements the tree.ValueGenerator interface.
func (s *storageInternalKeysIterator) Start(ctx context.Context, _ *kv.Txn) error {
var err error
s.metrics, err = s.evalCtx.ScanStorageInternalKeys(ctx, s.nodeID, s.storeID, s.start, s.end, s.megabytesPerSecond)
if err != nil {
err = errors.Wrapf(err, "getting table metrics for node %d store %d", s.nodeID, s.storeID)
}

return err
}

// Next implements the tree.ValueGenerator interface.
func (s *storageInternalKeysIterator) Next(_ context.Context) (bool, error) {
s.iterIdx++
return s.iterIdx <= len(s.metrics), nil
}

// Values implements the tree.ValueGenerator interface.
func (s *storageInternalKeysIterator) Values() (tree.Datums, error) {
metricsInfo := s.metrics[s.iterIdx-1]
levelDatum := tree.DNull

if metricsInfo.Level != -1 {
levelDatum = tree.NewDInt(tree.DInt(metricsInfo.Level))
}

return tree.Datums{
levelDatum,
tree.NewDInt(tree.DInt(s.nodeID)),
tree.NewDInt(tree.DInt(s.storeID)),
tree.NewDInt(tree.DInt(metricsInfo.SnapshotPinnedKeys)),
tree.NewDInt(tree.DInt(metricsInfo.SnapshotPinnedKeysBytes)),
tree.NewDInt(tree.DInt(metricsInfo.PointKeyDeleteCount)),
tree.NewDInt(tree.DInt(metricsInfo.PointKeySetCount)),
tree.NewDInt(tree.DInt(metricsInfo.RangeDeleteCount)),
tree.NewDInt(tree.DInt(metricsInfo.RangeKeySetCount)),
tree.NewDInt(tree.DInt(metricsInfo.RangeKeyDeleteCount)),
}, nil
}

// Close implements the tree.ValueGenerator interface.
func (tmi *storageInternalKeysIterator) Close(_ context.Context) {}

// ResolvedType implements the tree.ValueGenerator interface.
func (tmi *storageInternalKeysIterator) ResolvedType() *types.T {
return storageInternalKeysGeneratorType
}

func makeStorageInternalKeysGenerator(
ctx context.Context, evalCtx *eval.Context, args tree.Datums,
) (eval.ValueGenerator, error) {
isAdmin, err := evalCtx.SessionAccessor.HasAdminRole(ctx)
if err != nil {
return nil, err
}
if !isAdmin {
return nil, errInsufficientPriv
}
nodeID := int32(tree.MustBeDInt(args[0]))
storeID := int32(tree.MustBeDInt(args[1]))
start := []byte(tree.MustBeDBytes(args[2]))
end := []byte(tree.MustBeDBytes(args[3]))

var megabytesPerSecond int64
if len(args) > 4 {
megabytesPerSecond = int64(tree.MustBeDInt(args[4]))
} else {
megabytesPerSecond = int64(10)
}

return newStorageInternalKeysGenerator(evalCtx, nodeID, storeID, start, end, megabytesPerSecond), nil
}

var tableSpanStatsGeneratorType = types.MakeLabeledTuple(
[]*types.T{types.Int, types.Int, types.Int, types.Int, types.Int, types.Int, types.Float},
[]string{"database_id", "table_id", "range_count", "approximate_disk_bytes", "live_bytes", "total_bytes", "live_percentage"},
Expand Down
3 changes: 3 additions & 0 deletions pkg/sql/sem/eval/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,9 @@ type Context struct {
// GetTableMetrics is used in crdb_internal.sstable_metrics.
GetTableMetrics GetTableMetricsFunc

// ScanStorageInternalKeys is used in crdb_internal.scan_storage_internal_keys.
ScanStorageInternalKeys ScanStorageInternalKeysFunc

// SetCompactionConcurrency is used to change the compaction concurrency of
// a store.
SetCompactionConcurrency SetCompactionConcurrencyFunc
Expand Down
7 changes: 7 additions & 0 deletions pkg/sql/sem/eval/deps.go
Original file line number Diff line number Diff line change
Expand Up @@ -448,6 +448,13 @@ type GetTableMetricsFunc func(
ctx context.Context, nodeID, storeID int32, startKey, endKey []byte,
) ([]enginepb.SSTableMetricsInfo, error)

// ScanStorageInternalKeysFunc is used to retrieve pebble metrics on a key span
// (end-exclusive) at the given (nodeID, storeID).
// megabytesPerSecond is used to specify the maximmum number of bytes read per second.
type ScanStorageInternalKeysFunc func(
ctx context.Context, nodeID, storeID int32, startKey, endKey []byte, megabytesPerSecond int64,
) ([]enginepb.StorageInternalKeysMetrics, error)

// SetCompactionConcurrencyFunc is used to change the compaction concurrency of a
// store.
type SetCompactionConcurrencyFunc func(
Expand Down
2 changes: 2 additions & 0 deletions pkg/storage/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -996,6 +996,8 @@ type Engine interface {
// CompactRange ensures that the specified range of key value pairs is
// optimized for space efficiency.
CompactRange(start, end roachpb.Key) error
// ScanStorageInternalKeys returns key level statistics for each level of a pebble store (that overlap start and end).
ScanStorageInternalKeys(start, end roachpb.Key, megabytesPerSecond int64) ([]enginepb.StorageInternalKeysMetrics, error)
// GetTableMetrics returns information about sstables that overlap start and end.
GetTableMetrics(start, end roachpb.Key) ([]enginepb.SSTableMetricsInfo, error)
// RegisterFlushCompletedCallback registers a callback that will be run for
Expand Down
12 changes: 12 additions & 0 deletions pkg/storage/enginepb/rocksdb.proto
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,18 @@ message SSTableMetricsInfo {
uint64 approximate_span_bytes = 4 [(gogoproto.customname) = "ApproximateSpanBytes"];
}

message StorageInternalKeysMetrics {
// level is the lsm tree level the metrics are found.
int32 level = 1;
uint64 snapshot_pinned_keys = 2 [(gogoproto.customname) = "SnapshotPinnedKeys"];
uint64 snapshot_pinnedKeys_bytes = 3 [(gogoproto.customname) = "SnapshotPinnedKeysBytes"];
uint64 point_key_delete_count = 4 [(gogoproto.customname) = "PointKeyDeleteCount"];
uint64 point_key_set_count = 5 [(gogoproto.customname) = "PointKeySetCount"];
uint64 range_delete_count = 6 [(gogoproto.customname) = "RangeDeleteCount"];
uint64 range_key_set_count = 7 [(gogoproto.customname) = "RangeKeySetCount"];
uint64 range_key_delete_count = 8 [(gogoproto.customname) = "RangeKeyDeleteCount"];
}

// SSTUserProperties contains the user-added properties of a single sstable.
message SSTUserProperties {
string path = 1;
Expand Down
38 changes: 38 additions & 0 deletions pkg/storage/pebble.go
Original file line number Diff line number Diff line change
Expand Up @@ -2072,6 +2072,44 @@ func (p *Pebble) GetTableMetrics(start, end roachpb.Key) ([]enginepb.SSTableMetr
return metricsInfo, nil
}

// ScanStorageInternalKeys implements the Engine interface.
func (p *Pebble) ScanStorageInternalKeys(
start, end roachpb.Key, megabytesPerSecond int64,
) ([]enginepb.StorageInternalKeysMetrics, error) {
stats, err := p.db.ScanStatistics(context.TODO(), start, end, pebble.ScanStatisticsOptions{LimitBytesPerSecond: 1000000 * megabytesPerSecond})
if err != nil {
return []enginepb.StorageInternalKeysMetrics{}, err
}

var metrics []enginepb.StorageInternalKeysMetrics

for level := 0; level < 7; level++ {
metrics = append(metrics, enginepb.StorageInternalKeysMetrics{
Level: int32(level),
SnapshotPinnedKeys: uint64(stats.Levels[level].SnapshotPinnedKeys),
SnapshotPinnedKeysBytes: stats.Levels[level].SnapshotPinnedKeysBytes,
PointKeyDeleteCount: uint64(stats.Levels[level].KindsCount[pebble.InternalKeyKindDelete]),
PointKeySetCount: uint64(stats.Levels[level].KindsCount[pebble.InternalKeyKindSet]),
RangeDeleteCount: uint64(stats.Levels[level].KindsCount[pebble.InternalKeyKindRangeDelete]),
RangeKeySetCount: uint64(stats.Levels[level].KindsCount[pebble.InternalKeyKindRangeKeySet]),
RangeKeyDeleteCount: uint64(stats.Levels[level].KindsCount[pebble.InternalKeyKindRangeKeyDelete]),
})
}

metrics = append(metrics, enginepb.StorageInternalKeysMetrics{
Level: -1,
SnapshotPinnedKeys: uint64(stats.Accumulated.SnapshotPinnedKeys),
SnapshotPinnedKeysBytes: stats.Accumulated.SnapshotPinnedKeysBytes,
PointKeyDeleteCount: uint64(stats.Accumulated.KindsCount[pebble.InternalKeyKindDelete]),
PointKeySetCount: uint64(stats.Accumulated.KindsCount[pebble.InternalKeyKindSet]),
RangeDeleteCount: uint64(stats.Accumulated.KindsCount[pebble.InternalKeyKindRangeDelete]),
RangeKeySetCount: uint64(stats.Accumulated.KindsCount[pebble.InternalKeyKindRangeKeySet]),
RangeKeyDeleteCount: uint64(stats.Accumulated.KindsCount[pebble.InternalKeyKindRangeKeyDelete]),
})

return metrics, nil
}

// ApproximateDiskBytes implements the Engine interface.
func (p *Pebble) ApproximateDiskBytes(
from, to roachpb.Key,
Expand Down

0 comments on commit a2bf900

Please sign in to comment.