Skip to content

Commit

Permalink
Merge branch 'main' into bgh/code-toggle
Browse files Browse the repository at this point in the history
  • Loading branch information
briangregoryholmes committed Jan 16, 2025
2 parents 404a71a + e260a59 commit 389c62d
Show file tree
Hide file tree
Showing 52 changed files with 932 additions and 276 deletions.
4 changes: 3 additions & 1 deletion cli/cmd/auth/login.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,9 @@ func SelectOrgFlow(ctx context.Context, ch *cmdutil.Helper, interactive bool) er
return err
}

res, err := client.ListOrganizations(ctx, &adminv1.ListOrganizationsRequest{})
res, err := client.ListOrganizations(ctx, &adminv1.ListOrganizationsRequest{
PageSize: 1000,
})
if err != nil {
return err
}
Expand Down
4 changes: 3 additions & 1 deletion cli/cmd/org/org.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@ func OrgNames(ctx context.Context, ch *cmdutil.Helper) ([]string, error) {
return nil, err
}

resp, err := c.ListOrganizations(ctx, &adminv1.ListOrganizationsRequest{})
resp, err := c.ListOrganizations(ctx, &adminv1.ListOrganizationsRequest{
PageSize: 1000,
})
if err != nil {
return nil, err
}
Expand Down
4 changes: 3 additions & 1 deletion cli/cmd/org/switch.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,9 @@ func SetDefaultOrg(ctx context.Context, ch *cmdutil.Helper) error {
return err
}

res, err := c.ListOrganizations(ctx, &adminv1.ListOrganizationsRequest{})
res, err := c.ListOrganizations(ctx, &adminv1.ListOrganizationsRequest{
PageSize: 1000,
})
if err != nil {
return fmt.Errorf("listing orgs failed: %w", err)
}
Expand Down
6 changes: 1 addition & 5 deletions runtime/drivers/duckdb/duckdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"fmt"
"log/slog"
"net/url"
"path/filepath"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -500,10 +499,7 @@ func (c *connection) reopenDB(ctx context.Context) error {
// We want to set preserve_insertion_order=false in hosted environments only (where source data is never viewed directly). Setting it reduces batch data ingestion time by ~40%.
// Hack: Using AllowHostAccess as a proxy indicator for a hosted environment.
if !c.config.AllowHostAccess {
bootQueries = append(bootQueries,
"SET preserve_insertion_order TO false",
fmt.Sprintf("SET secret_directory = %s", safeSQLString(filepath.Join(dataDir, ".duckdb", "secrets"))),
)
bootQueries = append(bootQueries, "SET preserve_insertion_order TO false")
}

// Add init SQL if provided
Expand Down
161 changes: 159 additions & 2 deletions runtime/drivers/olap.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"database/sql"
"errors"
"fmt"
"math"
"strings"
"time"

Expand All @@ -16,8 +17,12 @@ import (
_ "time/tzdata"
)

// ErrUnsupportedConnector is returned from Ingest for unsupported connectors.
var ErrUnsupportedConnector = errors.New("drivers: connector not supported")
var (
// ErrUnsupportedConnector is returned from Ingest for unsupported connectors.
ErrUnsupportedConnector = errors.New("drivers: connector not supported")
// ErrOptimizationFailure is returned when an optimization fails.
ErrOptimizationFailure = errors.New("drivers: optimization failure")
)

// WithConnectionFunc is a callback function that provides a context to be used in further OLAP store calls to enforce affinity to a single connection.
// It also provides pointers to the actual database/sql and database/sql/driver connections.
Expand Down Expand Up @@ -500,6 +505,158 @@ func (d Dialect) DateDiff(grain runtimev1.TimeGrain, t1, t2 time.Time) (string,
}
}

// SelectInlineResults returns a SQL query which inline results from the result set supplied along with the positional arguments and dimension values.
func (d Dialect) SelectInlineResults(result *Result) (string, []any, []any, error) {
values := make([]any, len(result.Schema.Fields))
valuePtrs := make([]any, len(result.Schema.Fields))
for i := range values {
valuePtrs[i] = &values[i]
}

var dimVals []any
var args []any

rows := 0
prefix := ""
suffix := ""
// creating inline query for all dialects in one loop, accumulating field exprs first and then creating the query can be more cleaner
for result.Next() {
if err := result.Scan(valuePtrs...); err != nil {
return "", nil, nil, fmt.Errorf("select inline: failed to scan value: %w", err)
}
if d == DialectDruid {
// format - select * from (values (1, 2), (3, 4)) t(a, b)
if rows == 0 {
prefix = "SELECT * FROM (VALUES "
suffix = "t("
}
if rows > 0 {
prefix += ", "
}
} else {
// format - select 1 as a, 2 as b union all select 3 as a, 4 as b
if rows > 0 {
prefix += " UNION ALL "
}
prefix += "SELECT "
}

dimVals = append(dimVals, values[0])
for i, v := range values {
if d == DialectDruid {
if i == 0 {
prefix += "("
}
if rows == 0 {
suffix += d.EscapeIdentifier(result.Schema.Fields[i].Name)
if i != len(result.Schema.Fields)-1 {
suffix += ", "
}
}
}
if i > 0 {
prefix += ", "
}

if d == DialectDruid {
ok, expr, err := d.GetValExpr(v, result.Schema.Fields[i].Type.Code)
if err != nil {
return "", nil, nil, fmt.Errorf("select inline: failed to get value expression: %w", err)
}
if !ok {
return "", nil, nil, fmt.Errorf("select inline: unsupported value type %q: %w", result.Schema.Fields[i].Type.Code, ErrOptimizationFailure)
}
prefix += expr
} else {
prefix += fmt.Sprintf("%s AS %s", "?", d.EscapeIdentifier(result.Schema.Fields[i].Name))
args = append(args, v)
}
}

if d == DialectDruid {
prefix += ")"
if rows == 0 {
suffix += ")"
}
}

rows++
}

if d == DialectDruid {
prefix += ") "
}

return prefix + suffix, args, dimVals, nil
}

func (d Dialect) GetValExpr(val any, typ runtimev1.Type_Code) (bool, string, error) {
if val == nil {
ok, expr := d.GetNullExpr(typ)
if ok {
return true, expr, nil
}
return false, "", fmt.Errorf("could not get null expr for type %q", typ)
}
switch typ {
case runtimev1.Type_CODE_STRING:
if s, ok := val.(string); ok {
return true, d.EscapeStringValue(s), nil
}
return false, "", fmt.Errorf("could not cast value %v to string type", val)
case runtimev1.Type_CODE_INT8, runtimev1.Type_CODE_INT16, runtimev1.Type_CODE_INT32, runtimev1.Type_CODE_INT64, runtimev1.Type_CODE_INT128, runtimev1.Type_CODE_INT256, runtimev1.Type_CODE_UINT8, runtimev1.Type_CODE_UINT16, runtimev1.Type_CODE_UINT32, runtimev1.Type_CODE_UINT64, runtimev1.Type_CODE_UINT128, runtimev1.Type_CODE_UINT256, runtimev1.Type_CODE_FLOAT32, runtimev1.Type_CODE_FLOAT64, runtimev1.Type_CODE_DECIMAL:
// check NaN and Inf
if f, ok := val.(float64); ok && (math.IsNaN(f) || math.IsInf(f, 0)) {
return true, "NULL", nil
}

return true, fmt.Sprintf("%v", val), nil
case runtimev1.Type_CODE_BOOL:
return true, fmt.Sprintf("%v", val), nil
case runtimev1.Type_CODE_TIME, runtimev1.Type_CODE_DATE, runtimev1.Type_CODE_TIMESTAMP:
if t, ok := val.(time.Time); ok {
if ok, expr := d.GetTimeExpr(t); ok {
return true, expr, nil
}
return false, "", fmt.Errorf("cannot get time expr for dialect %q", d)
}
return false, "", fmt.Errorf("unsupported time type %q", typ)
default:
return false, "", fmt.Errorf("unsupported type %q", typ)
}
}

func (d Dialect) GetNullExpr(typ runtimev1.Type_Code) (bool, string) {
if d == DialectDruid {
switch typ {
case runtimev1.Type_CODE_STRING:
return true, "CAST(NULL AS VARCHAR)"
case runtimev1.Type_CODE_INT8, runtimev1.Type_CODE_INT16, runtimev1.Type_CODE_INT32, runtimev1.Type_CODE_INT64, runtimev1.Type_CODE_INT128, runtimev1.Type_CODE_INT256, runtimev1.Type_CODE_UINT8, runtimev1.Type_CODE_UINT16, runtimev1.Type_CODE_UINT32, runtimev1.Type_CODE_UINT64, runtimev1.Type_CODE_UINT128, runtimev1.Type_CODE_UINT256:
return true, "CAST(NULL AS INTEGER)"
case runtimev1.Type_CODE_FLOAT32, runtimev1.Type_CODE_FLOAT64, runtimev1.Type_CODE_DECIMAL:
return true, "CAST(NULL AS DOUBLE)"
case runtimev1.Type_CODE_BOOL:
return true, "CAST(NULL AS BOOLEAN)"
case runtimev1.Type_CODE_TIME, runtimev1.Type_CODE_DATE, runtimev1.Type_CODE_TIMESTAMP:
return true, "CAST(NULL AS TIMESTAMP)"
default:
return false, ""
}
}
return true, "NULL"
}

func (d Dialect) GetTimeExpr(t time.Time) (bool, string) {
switch d {
case DialectClickHouse:
return true, fmt.Sprintf("parseDateTimeBestEffort('%s')", t.Format(time.RFC3339Nano))
case DialectDuckDB, DialectDruid, DialectPinot:
return true, fmt.Sprintf("CAST('%s' AS TIMESTAMP)", t.Format(time.RFC3339Nano))
default:
return false, ""
}
}

func druidTimeFloorSpecifier(grain runtimev1.TimeGrain) string {
switch grain {
case runtimev1.TimeGrain_TIME_GRAIN_MILLISECOND:
Expand Down
22 changes: 14 additions & 8 deletions runtime/drivers/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,10 @@ type InstanceConfig struct {
// MetricsComparisonsExact indicates whether to rewrite metrics comparison queries to approximately correct queries.
// Approximated comparison queries are faster but may not return comparison data points for all values.
MetricsApproximateComparisons bool `mapstructure:"rill.metrics.approximate_comparisons"`
// MetricsApproximateComparisonsCTE indicates whether to rewrite metrics comparison queries to use a CTE for base query.
MetricsApproximateComparisonsCTE bool `mapstructure:"rill.metrics.approximate_comparisons_cte"`
// MetricsApproxComparisonTwoPhaseLimit if query limit is less than this then rewrite metrics comparison queries to use a two-phase comparison approach where first query is used to get the base values and the second query is used to get the comparison values.
MetricsApproxComparisonTwoPhaseLimit int64 `mapstructure:"rill.metrics.approximate_comparisons_two_phase_limit"`
// MetricsExactifyDruidTopN indicates whether to split Druid TopN queries into two queries to increase the accuracy of the returned measures.
// Enabling it reduces the performance of Druid toplist queries.
// See runtime/metricsview/executor_rewrite_druid_exactify.go for more details.
Expand Down Expand Up @@ -130,14 +134,16 @@ func (i *Instance) ResolveVariables(withLowerKeys bool) map[string]string {
func (i *Instance) Config() (InstanceConfig, error) {
// Default config
res := InstanceConfig{
DownloadLimitBytes: int64(datasize.MB * 128),
InteractiveSQLRowLimit: 10_000,
StageChanges: true,
ModelDefaultMaterialize: false,
ModelMaterializeDelaySeconds: 0,
MetricsApproximateComparisons: true,
MetricsExactifyDruidTopN: false,
AlertsDefaultStreamingRefreshCron: "*/10 * * * *", // Every 10 minutes
DownloadLimitBytes: int64(datasize.MB * 128),
InteractiveSQLRowLimit: 10_000,
StageChanges: true,
ModelDefaultMaterialize: false,
ModelMaterializeDelaySeconds: 0,
MetricsApproximateComparisons: true,
MetricsApproximateComparisonsCTE: false,
MetricsApproxComparisonTwoPhaseLimit: 250,
MetricsExactifyDruidTopN: false,
AlertsDefaultStreamingRefreshCron: "*/10 * * * *", // Every 10 minutes
}

// Resolve variables
Expand Down
1 change: 1 addition & 0 deletions runtime/metricsview/ast.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ type AST struct {
// - FromSelect and optionally SpineSelect and/or LeftJoinSelects
// - FromSelect and optionally JoinComparisonSelect (for comparison CTE based optimization, this combination is used, both should be set and one of them will be used as CTE)
type SelectNode struct {
RawSelect *ExprNode // Raw SQL SELECT statement to use
Alias string // Alias for the node used by outer SELECTs to reference it.
IsCTE bool // Whether this node is a Common Table Expression
DimFields []FieldNode // Dimensions fields to select
Expand Down
6 changes: 6 additions & 0 deletions runtime/metricsview/astsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,12 @@ func (b *sqlBuilder) writeSelectWithDisplayNames(n *SelectNode) error {
}

func (b *sqlBuilder) writeSelect(n *SelectNode) error {
if n.RawSelect != nil {
b.out.WriteString(n.RawSelect.Expr)
b.args = append(b.args, n.RawSelect.Args...)
return nil
}

b.out.WriteString("SELECT ")

for i, f := range n.DimFields {
Expand Down
12 changes: 10 additions & 2 deletions runtime/metricsview/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,9 @@ func (e *Executor) Query(ctx context.Context, qry *Query, executionTime *time.Ti
return nil, runtime.ErrForbidden
}

// preserve the original limit, required in 2 phase comparison
ogLimit := qry.Limit

rowsCap, err := e.rewriteQueryEnforceCaps(qry)
if err != nil {
return nil, err
Expand Down Expand Up @@ -241,7 +244,12 @@ func (e *Executor) Query(ctx context.Context, qry *Query, executionTime *time.Ti
return nil, err
}

e.rewriteApproxComparisons(ast)
ok, err := e.rewriteTwoPhaseComparisons(ctx, qry, ast, ogLimit)
if err != nil {
return nil, err
} // TODO if !ok then can log a warning that two phase comparison is not possible with a reason

e.rewriteApproxComparisons(ast, ok)

if err := e.rewriteLimitsIntoSubqueries(ast); err != nil {
return nil, err
Expand Down Expand Up @@ -352,7 +360,7 @@ func (e *Executor) Export(ctx context.Context, qry *Query, executionTime *time.T
return "", err
}

e.rewriteApproxComparisons(ast)
e.rewriteApproxComparisons(ast, false)

if err := e.rewriteLimitsIntoSubqueries(ast); err != nil {
return "", err
Expand Down
Loading

0 comments on commit 389c62d

Please sign in to comment.