Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support where_pending and where_errored filters in GetModelSplits #5727

Merged
merged 4 commits into from
Sep 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion cli/cmd/project/splits.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (

func SplitsCmd(ch *cmdutil.Helper) *cobra.Command {
var project, path, model string
var local bool
var pending, errored, local bool
var pageSize uint32
var pageToken string

Expand Down Expand Up @@ -42,6 +42,8 @@ func SplitsCmd(ch *cmdutil.Helper) *cobra.Command {
res, err := rt.GetModelSplits(cmd.Context(), &runtimev1.GetModelSplitsRequest{
InstanceId: instanceID,
Model: model,
Pending: pending,
Errored: errored,
PageSize: pageSize,
PageToken: pageToken,
})
Expand All @@ -64,6 +66,9 @@ func SplitsCmd(ch *cmdutil.Helper) *cobra.Command {
splitsCmd.Flags().StringVar(&project, "project", "", "Project Name")
splitsCmd.Flags().StringVar(&path, "path", ".", "Project directory")
splitsCmd.Flags().StringVar(&model, "model", "", "Model Name")
splitsCmd.Flags().BoolVar(&pending, "pending", false, "Only fetch pending splits")
begelundmuller marked this conversation as resolved.
Show resolved Hide resolved
splitsCmd.Flags().BoolVar(&errored, "errored", false, "Only fetch errored splits")
splitsCmd.MarkFlagsMutuallyExclusive("pending", "errored")
splitsCmd.Flags().BoolVar(&local, "local", false, "Target locally running Rill")
splitsCmd.Flags().Uint32Var(&pageSize, "page-size", 50, "Number of splits to return per page")
splitsCmd.Flags().StringVar(&pageToken, "page-token", "", "Pagination token")
Expand Down
1,036 changes: 528 additions & 508 deletions proto/gen/rill/runtime/v1/api.pb.go

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions proto/gen/rill/runtime/v1/api.pb.validate.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions proto/gen/rill/runtime/v1/runtime.swagger.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -993,6 +993,14 @@ paths:
in: path
required: true
type: string
- name: pending
in: query
required: false
type: boolean
- name: errored
in: query
required: false
type: boolean
- name: pageSize
in: query
required: false
Expand Down
2 changes: 2 additions & 0 deletions proto/rill/runtime/v1/api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -664,6 +664,8 @@ message GetExploreResponse {
message GetModelSplitsRequest {
string instance_id = 1;
string model = 2;
bool pending = 5;
bool errored = 6;
uint32 page_size = 3 [(validate.rules).uint32 = {ignore_empty: true, lte: 10000}];
string page_token = 4;
}
Expand Down
13 changes: 11 additions & 2 deletions runtime/drivers/catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,8 @@ type CatalogStore interface {
DeleteResource(ctx context.Context, v int64, k, n string) error
DeleteResources(ctx context.Context) error

FindModelSplits(ctx context.Context, modelID string, afterIndex int, afterKey string, limit int) ([]ModelSplit, error)
FindModelSplits(ctx context.Context, opts *FindModelSplitsOptions) ([]ModelSplit, error)
FindModelSplitsByKeys(ctx context.Context, modelID string, keys []string) ([]ModelSplit, error)
FindModelSplitsByPending(ctx context.Context, modelID string, limit int) ([]ModelSplit, error)
CheckModelSplitsHaveErrors(ctx context.Context, modelID string) (bool, error)
InsertModelSplit(ctx context.Context, modelID string, split ModelSplit) error
UpdateModelSplit(ctx context.Context, modelID string, split ModelSplit) error
Expand Down Expand Up @@ -78,3 +77,13 @@ type ModelSplit struct {
// Elapsed is the duration of the last execution of the split.
Elapsed time.Duration
}

// FindModelSplitsOptions is used to filter model splits.
type FindModelSplitsOptions struct {
ModelID string
Limit int
WherePending bool
WhereErrored bool
AfterIndex int
AfterKey string
}
2 changes: 1 addition & 1 deletion runtime/drivers/catalog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func testCatalogSplits(t *testing.T, catalog drivers.CatalogStore) {
err = catalog.InsertModelSplit(ctx, modelID, split)
require.NoError(t, err)

splits, err = catalog.FindModelSplitsByPending(ctx, modelID, 10)
splits, err = catalog.FindModelSplits(ctx, &drivers.FindModelSplitsOptions{ModelID: modelID, WherePending: true, Limit: 10})
require.NoError(t, err)
require.Len(t, splits, 1)
requireSplitEqual(t, split, splits[0])
Expand Down
6 changes: 1 addition & 5 deletions runtime/drivers/duckdb/catalogv2.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,18 +189,14 @@ func (c *connection) DeleteResources(ctx context.Context) error {
return nil
}

func (c *connection) FindModelSplits(ctx context.Context, modelID string, afterIndex int, afterKey string, limit int) ([]drivers.ModelSplit, error) {
func (c *connection) FindModelSplits(ctx context.Context, opts *drivers.FindModelSplitsOptions) ([]drivers.ModelSplit, error) {
return nil, drivers.ErrNotImplemented
}

func (c *connection) FindModelSplitsByKeys(ctx context.Context, modelID string, keys []string) ([]drivers.ModelSplit, error) {
return nil, drivers.ErrNotImplemented
}

func (c *connection) FindModelSplitsByPending(ctx context.Context, modelID string, limit int) ([]drivers.ModelSplit, error) {
return nil, drivers.ErrNotImplemented
}

func (c *connection) CheckModelSplitsHaveErrors(ctx context.Context, modelID string) (bool, error) {
return false, drivers.ErrNotImplemented
}
Expand Down
71 changes: 28 additions & 43 deletions runtime/drivers/sqlite/catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,17 +142,34 @@ func (c *catalogStore) DeleteResources(ctx context.Context) error {
return nil
}

func (c *catalogStore) FindModelSplits(ctx context.Context, modelID string, afterIndex int, afterKey string, limit int) ([]drivers.ModelSplit, error) {
rows, err := c.db.QueryContext(
ctx,
"SELECT key, data_json, idx, watermark, executed_on, error, elapsed_ms FROM model_splits WHERE instance_id=? AND model_id=? AND (idx > ? OR (idx = ? AND key > ?)) ORDER BY idx, key LIMIT ?",
c.instanceID,
modelID,
afterIndex,
afterIndex,
afterKey,
limit,
)
func (c *catalogStore) FindModelSplits(ctx context.Context, opts *drivers.FindModelSplitsOptions) ([]drivers.ModelSplit, error) {
var qry strings.Builder
var args []any

qry.WriteString("SELECT key, data_json, idx, watermark, executed_on, error, elapsed_ms FROM model_splits WHERE instance_id=? AND model_id=?")
args = append(args, c.instanceID, opts.ModelID)

if opts.WhereErrored {
qry.WriteString(" AND error != ''")
}

if opts.WherePending {
qry.WriteString(" AND executed_on IS NULL")
}

if opts.AfterIndex != 0 || opts.AfterKey != "" {
qry.WriteString(" AND (idx > ? OR (idx = ? AND key > ?))")
args = append(args, opts.AfterIndex, opts.AfterIndex, opts.AfterKey)
}

qry.WriteString(" ORDER BY idx, key")

if opts.Limit != 0 {
qry.WriteString(" LIMIT ?")
args = append(args, opts.Limit)
}

rows, err := c.db.QueryContext(ctx, qry.String(), args...)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -220,38 +237,6 @@ func (c *catalogStore) FindModelSplitsByKeys(ctx context.Context, modelID string
return res, nil
}

func (c *catalogStore) FindModelSplitsByPending(ctx context.Context, modelID string, limit int) ([]drivers.ModelSplit, error) {
rows, err := c.db.QueryxContext(
ctx,
"SELECT key, data_json, idx, watermark, executed_on, error, elapsed_ms FROM model_splits WHERE instance_id=? AND model_id=? AND executed_on IS NULL ORDER BY idx LIMIT ?",
c.instanceID,
modelID,
limit,
)
if err != nil {
return nil, err
}
defer rows.Close()

var res []drivers.ModelSplit
for rows.Next() {
var elapsedMs int64
r := drivers.ModelSplit{}
err := rows.Scan(&r.Key, &r.DataJSON, &r.Index, &r.Watermark, &r.ExecutedOn, &r.Error, &elapsedMs)
if err != nil {
return nil, err
}
r.Elapsed = time.Duration(elapsedMs) * time.Millisecond
res = append(res, r)
}

if rows.Err() != nil {
return nil, err
}

return res, nil
}

func (c *catalogStore) CheckModelSplitsHaveErrors(ctx context.Context, modelID string) (bool, error) {
rows, err := c.db.QueryContext(
ctx,
Expand Down
12 changes: 10 additions & 2 deletions runtime/reconcilers/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -902,7 +902,11 @@ func (r *ModelReconciler) executeAll(ctx context.Context, self *runtimev1.Resour
// This enables the first split to create the initial result (such as a table) that the other splits incrementally build upon.
if !incrementalRun {
// Find the first split
splits, err := catalog.FindModelSplitsByPending(ctx, model.State.SplitsModelId, 1)
splits, err := catalog.FindModelSplits(ctx, &drivers.FindModelSplitsOptions{
ModelID: model.State.SplitsModelId,
WherePending: true,
Limit: 1,
})
if err != nil {
return "", nil, fmt.Errorf("failed to load first split: %w", err)
}
Expand Down Expand Up @@ -930,7 +934,11 @@ func (r *ModelReconciler) executeAll(ctx context.Context, self *runtimev1.Resour
// Get a batch of pending splits
// Note: We do this when no workers are running because splits are considered pending if they have not completed execution yet.
// This reduces concurrency when processing the last handful of splits in each batch, but with large batch sizes it's worth the simplicity for now.
splits, err := catalog.FindModelSplitsByPending(ctx, model.State.SplitsModelId, _modelPendingSplitsBatchSize)
splits, err := catalog.FindModelSplits(ctx, &drivers.FindModelSplitsOptions{
ModelID: model.State.SplitsModelId,
WherePending: true,
Limit: _modelPendingSplitsBatchSize,
})
if err != nil {
return "", nil, err
}
Expand Down
13 changes: 11 additions & 2 deletions runtime/server/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ func (s *Server) GetModelSplits(ctx context.Context, req *runtimev1.GetModelSpli
return &runtimev1.GetModelSplitsResponse{}, nil
}

afterIdx := -1
afterIdx := 0
afterKey := ""
if req.PageToken != "" {
err := unmarshalPageToken(req.PageToken, &afterIdx, &afterKey)
Expand All @@ -303,7 +303,16 @@ func (s *Server) GetModelSplits(ctx context.Context, req *runtimev1.GetModelSpli
}
defer release()

splits, err := catalog.FindModelSplits(ctx, splitsModelID, afterIdx, afterKey, validPageSize(req.PageSize))
opts := &drivers.FindModelSplitsOptions{
ModelID: splitsModelID,
WherePending: req.Pending,
WhereErrored: req.Errored,
AfterIndex: afterIdx,
AfterKey: afterKey,
Limit: validPageSize(req.PageSize),
}

splits, err := catalog.FindModelSplits(ctx, opts)
if err != nil {
return nil, status.Error(codes.InvalidArgument, err.Error())
}
Expand Down
12 changes: 12 additions & 0 deletions web-common/src/proto/gen/rill/runtime/v1/api_pb.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3092,6 +3092,16 @@ export class GetModelSplitsRequest extends Message<GetModelSplitsRequest> {
*/
model = "";

/**
* @generated from field: bool pending = 5;
*/
pending = false;

/**
* @generated from field: bool errored = 6;
*/
errored = false;

/**
* @generated from field: uint32 page_size = 3;
*/
Expand All @@ -3112,6 +3122,8 @@ export class GetModelSplitsRequest extends Message<GetModelSplitsRequest> {
static readonly fields: FieldList = proto3.util.newFieldList(() => [
{ no: 1, name: "instance_id", kind: "scalar", T: 9 /* ScalarType.STRING */ },
{ no: 2, name: "model", kind: "scalar", T: 9 /* ScalarType.STRING */ },
{ no: 5, name: "pending", kind: "scalar", T: 8 /* ScalarType.BOOL */ },
{ no: 6, name: "errored", kind: "scalar", T: 8 /* ScalarType.BOOL */ },
{ no: 3, name: "page_size", kind: "scalar", T: 13 /* ScalarType.UINT32 */ },
{ no: 4, name: "page_token", kind: "scalar", T: 9 /* ScalarType.STRING */ },
]);
Expand Down
2 changes: 2 additions & 0 deletions web-common/src/runtime-client/gen/index.schemas.ts
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,8 @@ export type QueryServiceColumnCardinalityParams = {
};

export type RuntimeServiceGetModelSplitsParams = {
pending?: boolean;
errored?: boolean;
pageSize?: number;
pageToken?: string;
};
Expand Down
Loading