Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: enable cache for external tables using a cache control on metrics views #6265

Open
wants to merge 14 commits into
base: main
Choose a base branch
from
Open
1,877 changes: 956 additions & 921 deletions proto/gen/rill/runtime/v1/resources.pb.go

Large diffs are not rendered by default.

8 changes: 8 additions & 0 deletions proto/gen/rill/runtime/v1/resources.pb.validate.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions proto/gen/rill/runtime/v1/runtime.swagger.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5154,6 +5154,14 @@ definitions:
description: |-
Available time zones list preferred time zones using IANA location identifiers.
Deprecated: Now defined in the Explore resource.
cacheEnabled:
type: boolean
description: Cache controls for the metrics view.
cacheKeySql:
type: string
cacheKeyTtlSeconds:
type: string
format: int64
v1MetricsViewState:
type: object
properties:
Expand Down
4 changes: 4 additions & 0 deletions proto/rill/runtime/v1/resources.proto
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,10 @@ message MetricsViewSpec {
// Available time zones list preferred time zones using IANA location identifiers.
// Deprecated: Now defined in the Explore resource.
repeated string available_time_zones = 10;
// Cache controls for the metrics view.
optional bool cache_enabled = 25;
string cache_key_sql = 26;
int64 cache_key_ttl_seconds = 27;
}

message SecurityRule {
Expand Down
16 changes: 16 additions & 0 deletions runtime/compilers/rillv1/parse_metrics_view.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,11 @@ type MetricsViewYAML struct {
Dimension string `yaml:"dimension"`
} `yaml:"default_comparison"`
AvailableTimeRanges []ExploreTimeRangeYAML `yaml:"available_time_ranges"`
Cache struct {
Enabled *bool `yaml:"enabled"`
KeySQL string `yaml:"key_sql"`
KeyTTL string `yaml:"key_ttl"`
} `yaml:"cache"`
}

type MetricsViewFieldSelectorYAML struct {
Expand Down Expand Up @@ -778,6 +783,14 @@ func (p *Parser) parseMetricsView(node *Node) error {
node.Refs = append(node.Refs, ResourceName{Kind: ResourceKindTheme, Name: tmp.DefaultTheme})
}

var cacheTTLDuration time.Duration
if tmp.Cache.KeyTTL != "" {
cacheTTLDuration, err = time.ParseDuration(tmp.Cache.KeyTTL)
if err != nil {
return fmt.Errorf(`invalid "cache.key_ttl": %w`, err)
}
}

r, err := p.insertResource(ResourceKindMetricsView, node.Name, node.Paths, node.Refs...)
if err != nil {
return err
Expand Down Expand Up @@ -820,6 +833,9 @@ func (p *Parser) parseMetricsView(node *Node) error {
spec.Measures = measures

spec.SecurityRules = securityRules
spec.CacheEnabled = tmp.Cache.Enabled
spec.CacheKeySql = tmp.Cache.KeySQL
spec.CacheKeyTtlSeconds = int64(cacheTTLDuration.Seconds())

// Backwards compatibility: When the version is 0, populate the deprecated fields and also emit an Explore resource for the metrics view.
if node.Version > 0 {
Expand Down
48 changes: 43 additions & 5 deletions runtime/metricsview/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ type Executor struct {
rt *runtime.Runtime
instanceID string
metricsView *runtimev1.MetricsViewSpec
streaming bool
security *runtime.ResolvedSecurity
priority int

Expand All @@ -37,7 +38,7 @@ type Executor struct {
}

// NewExecutor creates a new Executor for the provided metrics view.
func NewExecutor(ctx context.Context, rt *runtime.Runtime, instanceID string, mv *runtimev1.MetricsViewSpec, sec *runtime.ResolvedSecurity, priority int) (*Executor, error) {
func NewExecutor(ctx context.Context, rt *runtime.Runtime, instanceID string, mv *runtimev1.MetricsViewSpec, streaming bool, sec *runtime.ResolvedSecurity, priority int) (*Executor, error) {
olap, release, err := rt.OLAP(ctx, instanceID, mv.Connector)
if err != nil {
return nil, fmt.Errorf("failed to acquire connector for metrics view: %w", err)
Expand All @@ -52,6 +53,7 @@ func NewExecutor(ctx context.Context, rt *runtime.Runtime, instanceID string, mv
rt: rt,
instanceID: instanceID,
metricsView: mv,
streaming: streaming,
security: sec,
priority: priority,
olap: olap,
Expand All @@ -65,10 +67,46 @@ func (e *Executor) Close() {
e.olapRelease()
}

// Cacheable returns whether the result of running the given query is cacheable.
func (e *Executor) Cacheable(qry *Query) bool {
// TODO: Get from OLAP instead of hardcoding
return e.olap.Dialect() == drivers.DialectDuckDB
// CacheKey returns a cache key based on the executor's metrics view's cache key configuration.
// If ok is false, caching is disabled for the metrics view.
func (e *Executor) CacheKey(ctx context.Context) ([]byte, bool, error) {
spec := e.metricsView
// Cache is disabled for metrics views based on external table
if (spec.CacheEnabled != nil && !*spec.CacheEnabled) || (spec.CacheEnabled == nil && e.streaming) {
return nil, false, nil
}

if spec.CacheKeySql == "" {
if !e.streaming {
// for metrics views on rill managed tables no cache key specific to the metrics view is required
return []byte(""), true, nil
}
// watermark is the default cache key for streaming metrics views
watermark, err := e.loadWatermark(ctx, nil)
if err != nil {
return nil, false, err
}
return []byte(watermark.Format(time.RFC3339)), true, nil
}

res, err := e.olap.Execute(ctx, &drivers.Statement{
Query: spec.CacheKeySql,
Priority: e.priority,
})
if err != nil {
return nil, false, err
}
defer res.Close()
var key string
for res.Next() {
if err := res.Scan(&key); err != nil {
return nil, false, err
}
}
k-anshul marked this conversation as resolved.
Show resolved Hide resolved
if res.Err() != nil {
return nil, false, err
}
return []byte(key), true, nil
k-anshul marked this conversation as resolved.
Show resolved Hide resolved
}

// ValidateQuery validates the provided query against the executor's metrics view.
Expand Down
6 changes: 6 additions & 0 deletions runtime/metricsview/executor_validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,12 @@ func (e *Executor) ValidateMetricsView(ctx context.Context) (*ValidateMetricsVie
}
}

// Validate the cache key can be resolved
_, _, err = e.CacheKey(ctx)
if err != nil {
res.OtherErrs = append(res.OtherErrs, fmt.Errorf("failed to get cache key: %w", err))
}

return res, nil
}

Expand Down
2 changes: 1 addition & 1 deletion runtime/metricsview/executor_validate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func TestValidateMetricsView(t *testing.T) {
},
}

e, err := metricsview.NewExecutor(context.Background(), rt, instanceID, mv, runtime.ResolvedSecurityOpen, 0)
e, err := metricsview.NewExecutor(context.Background(), rt, instanceID, mv, false, runtime.ResolvedSecurityOpen, 0)
require.NoError(t, err)

res, err := e.ValidateMetricsView(context.Background())
Expand Down
9 changes: 4 additions & 5 deletions runtime/queries/metricsview.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (
var ErrForbidden = errors.New("action not allowed")

// resolveMVAndSecurityFromAttributes resolves the metrics view and security policy from the attributes
func resolveMVAndSecurityFromAttributes(ctx context.Context, rt *runtime.Runtime, instanceID, metricsViewName string, claims *runtime.SecurityClaims) (*runtimev1.MetricsViewSpec, *runtime.ResolvedSecurity, error) {
func resolveMVAndSecurityFromAttributes(ctx context.Context, rt *runtime.Runtime, instanceID, metricsViewName string, claims *runtime.SecurityClaims) (*runtimev1.MetricsViewState, *runtime.ResolvedSecurity, error) {
res, mv, err := lookupMetricsView(ctx, rt, instanceID, metricsViewName)
if err != nil {
return nil, nil, err
Expand All @@ -49,7 +49,7 @@ func resolveMVAndSecurityFromAttributes(ctx context.Context, rt *runtime.Runtime
}

// returns the metrics view and the time the catalog was last updated
func lookupMetricsView(ctx context.Context, rt *runtime.Runtime, instanceID, name string) (*runtimev1.Resource, *runtimev1.MetricsViewSpec, error) {
func lookupMetricsView(ctx context.Context, rt *runtime.Runtime, instanceID, name string) (*runtimev1.Resource, *runtimev1.MetricsViewState, error) {
ctrl, err := rt.Controller(ctx, instanceID)
if err != nil {
return nil, nil, status.Error(codes.InvalidArgument, err.Error())
Expand All @@ -61,12 +61,11 @@ func lookupMetricsView(ctx context.Context, rt *runtime.Runtime, instanceID, nam
}

mv := res.GetMetricsView()
spec := mv.State.ValidSpec
if spec == nil {
if mv.State.ValidSpec == nil {
return nil, nil, status.Errorf(codes.InvalidArgument, "metrics view %q is invalid", name)
}

return res, spec, nil
return res, mv.State, nil
}

func metricsQuery(ctx context.Context, olap drivers.OLAPStore, priority int, sql string, args []any) ([]*runtimev1.MetricsViewColumn, []*structpb.Struct, error) {
Expand Down
4 changes: 2 additions & 2 deletions runtime/queries/metricsview_aggregation.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func (q *MetricsViewAggregation) Resolve(ctx context.Context, rt *runtime.Runtim
return fmt.Errorf("error rewriting to metrics query: %w", err)
}

e, err := metricsview.NewExecutor(ctx, rt, instanceID, mv, security, priority)
e, err := metricsview.NewExecutor(ctx, rt, instanceID, mv.ValidSpec, mv.Streaming, security, priority)
if err != nil {
return err
}
Expand Down Expand Up @@ -124,7 +124,7 @@ func (q *MetricsViewAggregation) Export(ctx context.Context, rt *runtime.Runtime
return fmt.Errorf("error rewriting to metrics query: %w", err)
}

e, err := metricsview.NewExecutor(ctx, rt, instanceID, mv, security, opts.Priority)
e, err := metricsview.NewExecutor(ctx, rt, instanceID, mv.ValidSpec, mv.Streaming, security, opts.Priority)
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions runtime/queries/metricsview_comparison_toplist.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func (q *MetricsViewComparison) Resolve(ctx context.Context, rt *runtime.Runtime
return fmt.Errorf("error rewriting to metrics query: %w", err)
}

e, err := metricsview.NewExecutor(ctx, rt, instanceID, mv, security, priority)
e, err := metricsview.NewExecutor(ctx, rt, instanceID, mv.ValidSpec, mv.Streaming, security, priority)
if err != nil {
return err
}
Expand Down Expand Up @@ -187,7 +187,7 @@ func (q *MetricsViewComparison) Export(ctx context.Context, rt *runtime.Runtime,
return fmt.Errorf("error rewriting to metrics query: %w", err)
}

e, err := metricsview.NewExecutor(ctx, rt, instanceID, mv, security, opts.Priority)
e, err := metricsview.NewExecutor(ctx, rt, instanceID, mv.ValidSpec, mv.Streaming, security, opts.Priority)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion runtime/queries/metricsview_schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func (q *MetricsViewSchema) Resolve(ctx context.Context, rt *runtime.Runtime, in
return err
}

e, err := metricsview.NewExecutor(ctx, rt, instanceID, mv, sec, priority)
e, err := metricsview.NewExecutor(ctx, rt, instanceID, mv.ValidSpec, mv.Streaming, sec, priority)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion runtime/queries/metricsview_search.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func (q *MetricsViewSearch) Resolve(ctx context.Context, rt *runtime.Runtime, in
return err
}

exec, err := metricsview.NewExecutor(ctx, rt, instanceID, mv, sec, priority)
exec, err := metricsview.NewExecutor(ctx, rt, instanceID, mv.ValidSpec, mv.Streaming, sec, priority)
if err != nil {
return err
}
Expand Down
8 changes: 4 additions & 4 deletions runtime/queries/metricsview_timeseries.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,16 +78,16 @@ func (q *MetricsViewTimeSeries) Resolve(ctx context.Context, rt *runtime.Runtime
return err
}

if mv.TimeDimension == "" {
if mv.ValidSpec.TimeDimension == "" {
return fmt.Errorf("metrics view '%s' does not have a time dimension", q.MetricsViewName)
}

qry, err := q.rewriteToMetricsViewQuery(mv.TimeDimension)
qry, err := q.rewriteToMetricsViewQuery(mv.ValidSpec.TimeDimension)
if err != nil {
return fmt.Errorf("error rewriting to metrics query: %w", err)
}

e, err := metricsview.NewExecutor(ctx, rt, instanceID, mv, security, priority)
e, err := metricsview.NewExecutor(ctx, rt, instanceID, mv.ValidSpec, mv.Streaming, security, priority)
if err != nil {
return err
}
Expand All @@ -99,7 +99,7 @@ func (q *MetricsViewTimeSeries) Resolve(ctx context.Context, rt *runtime.Runtime
}
defer res.Close()

return q.populateResult(res, mv.TimeDimension, mv)
return q.populateResult(res, mv.ValidSpec.TimeDimension, mv.ValidSpec)
}

func (q *MetricsViewTimeSeries) Export(ctx context.Context, rt *runtime.Runtime, instanceID string, w io.Writer, opts *runtime.ExportOptions) error {
Expand Down
4 changes: 2 additions & 2 deletions runtime/queries/metricsview_toplist.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func (q *MetricsViewToplist) Resolve(ctx context.Context, rt *runtime.Runtime, i
return fmt.Errorf("error rewriting to metrics query: %w", err)
}

e, err := metricsview.NewExecutor(ctx, rt, instanceID, mv, security, priority)
e, err := metricsview.NewExecutor(ctx, rt, instanceID, mv.ValidSpec, mv.Streaming, security, priority)
if err != nil {
return err
}
Expand Down Expand Up @@ -112,7 +112,7 @@ func (q *MetricsViewToplist) Export(ctx context.Context, rt *runtime.Runtime, in
return fmt.Errorf("error rewriting to metrics query: %w", err)
}

e, err := metricsview.NewExecutor(ctx, rt, instanceID, mv, security, opts.Priority)
e, err := metricsview.NewExecutor(ctx, rt, instanceID, mv.ValidSpec, mv.Streaming, security, opts.Priority)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion runtime/queries/metricsview_totals.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func (q *MetricsViewTotals) Resolve(ctx context.Context, rt *runtime.Runtime, in
return fmt.Errorf("error rewriting to metrics query: %w", err)
}

e, err := metricsview.NewExecutor(ctx, rt, instanceID, mv, security, priority)
e, err := metricsview.NewExecutor(ctx, rt, instanceID, mv.ValidSpec, mv.Streaming, security, priority)
if err != nil {
return err
}
Expand Down
Loading
Loading