diff --git a/runtime/drivers/duckdb/duckdb.go b/runtime/drivers/duckdb/duckdb.go index edb15421052..b538372bd06 100644 --- a/runtime/drivers/duckdb/duckdb.go +++ b/runtime/drivers/duckdb/duckdb.go @@ -387,6 +387,9 @@ func (c *connection) AsModelExecutor(instanceID string, opts *drivers.ModelExecu if f, ok := opts.InputHandle.AsFileStore(); ok && opts.InputConnector == "local_file" { return &localFileToSelfExecutor{c, f}, true } + if opts.InputHandle.Driver() == "mysql" || opts.InputHandle.Driver() == "postgres" { + return &sqlStoreToSelfExecutor{c}, true + } } if opts.InputHandle == c { if opts.OutputHandle.Driver() == "file" { diff --git a/runtime/drivers/duckdb/model_executor_sqlstore_self.go b/runtime/drivers/duckdb/model_executor_sqlstore_self.go new file mode 100644 index 00000000000..b3df25b2447 --- /dev/null +++ b/runtime/drivers/duckdb/model_executor_sqlstore_self.go @@ -0,0 +1,118 @@ +package duckdb + +import ( + "context" + "fmt" + "strings" + + "github.com/mitchellh/mapstructure" + "github.com/rilldata/rill/runtime/drivers" + "github.com/rilldata/rill/runtime/drivers/mysql" + "github.com/rilldata/rill/runtime/drivers/postgres" +) + +type sqlStoreToSelfInputProps struct { + SQL string `mapstructure:"sql"` + DSN string `mapstructure:"dsn"` + DatabaseURL string `mapstructure:"database_url"` +} + +func (p *sqlStoreToSelfInputProps) resolveDSN() string { + if p.DSN != "" { + return p.DSN + } + return p.DatabaseURL +} + +func (p *sqlStoreToSelfInputProps) Validate() error { + if p.SQL == "" { + return fmt.Errorf("missing property 'sql'") + } + if p.DSN != "" && p.DatabaseURL != "" { + return fmt.Errorf("cannot set both 'dsn' and 'database_url'") + } + return nil +} + +type sqlStoreToSelfExecutor struct { + c *connection +} + +var _ drivers.ModelExecutor = &sqlStoreToSelfExecutor{} + +func (e *sqlStoreToSelfExecutor) Concurrency(desired int) (int, bool) { + if desired > 1 { + return 0, false + } + return 1, true +} + +func (e *sqlStoreToSelfExecutor) Execute(ctx context.Context, opts *drivers.ModelExecuteOptions) (*drivers.ModelResult, error) { + inputProps := &sqlStoreToSelfInputProps{} + if err := mapstructure.WeakDecode(opts.InputProperties, inputProps); err != nil { + return nil, fmt.Errorf("failed to parse input properties: %w", err) + } + if err := inputProps.Validate(); err != nil { + return nil, fmt.Errorf("invalid input properties: %w", err) + } + + // Build the model executor options with updated input properties + clone := *opts + newInputProps, err := e.modelInputProperties(opts.ModelName, opts.InputConnector, opts.InputHandle, inputProps) + if err != nil { + return nil, err + } + clone.InputProperties = newInputProps + newOpts := &clone + + // execute + executor := &selfToSelfExecutor{c: e.c} + return executor.Execute(ctx, newOpts) +} + +func (e *sqlStoreToSelfExecutor) modelInputProperties(modelName, inputConnector string, inputHandle drivers.Handle, inputProps *sqlStoreToSelfInputProps) (map[string]any, error) { + m := &ModelInputProperties{} + dbName := fmt.Sprintf("%s__%s", modelName, inputConnector) + safeDBName := safeName(dbName) + userQuery, _ := strings.CutSuffix(inputProps.SQL, ";") // trim trailing semi colon + switch inputHandle.Driver() { + case "mysql": + dsn := inputProps.resolveDSN() + if dsn == "" { + // may be configured via a connector + var config *mysql.ConfigProperties + if err := mapstructure.Decode(inputHandle.Config(), &config); err != nil { + return nil, err + } + dsn = rewriteMySQLDSN(config.DSN) + } + if dsn == "" { + return nil, fmt.Errorf("must set `dsn` for models that transfer data from `mysql` to `duckdb`") + } + m.PreExec = fmt.Sprintf("INSTALL 'MYSQL'; LOAD 'MYSQL'; ATTACH %s AS %s (TYPE mysql, READ_ONLY)", safeSQLString(dsn), safeDBName) + m.SQL = fmt.Sprintf("SELECT * FROM mysql_query(%s, %s)", safeSQLString(dbName), safeSQLString(userQuery)) + case "postgres": + dsn := inputProps.resolveDSN() + if dsn == "" { + // may be configured via a connector + var config *postgres.ConfigProperties + if err := mapstructure.Decode(inputHandle.Config(), &config); err != nil { + return nil, err + } + dsn = config.ResolveDSN() + } + if dsn == "" { + return nil, fmt.Errorf("must set `database_url` or `dsn` for models that transfer data from `postgres` to `duckdb`") + } + m.PreExec = fmt.Sprintf("INSTALL 'POSTGRES'; LOAD 'POSTGRES'; ATTACH %s AS %s (TYPE postgres, READ_ONLY)", safeSQLString(dsn), safeDBName) + m.SQL = fmt.Sprintf("SELECT * FROM postgres_query(%s, %s)", safeSQLString(dbName), safeSQLString(userQuery)) + default: + return nil, fmt.Errorf("internal error: unsupported external database: %s", inputHandle.Driver()) + } + m.PostExec = fmt.Sprintf("DETACH %s", safeDBName) + propsMap := make(map[string]any) + if err := mapstructure.Decode(m, &propsMap); err != nil { + return nil, err + } + return propsMap, nil +} diff --git a/runtime/drivers/duckdb/olap.go b/runtime/drivers/duckdb/olap.go index b73f6e3ca29..8ece0fc2312 100644 --- a/runtime/drivers/duckdb/olap.go +++ b/runtime/drivers/duckdb/olap.go @@ -262,7 +262,17 @@ func (c *connection) InsertTableAsSelect(ctx context.Context, name, sql string, if opts.Strategy == drivers.IncrementalStrategyAppend { err = db.MutateTable(ctx, name, func(ctx context.Context, conn *sqlx.Conn) error { + if opts.BeforeInsert != "" { + _, err := conn.ExecContext(ctx, opts.BeforeInsert) + if err != nil { + return err + } + } _, err := conn.ExecContext(ctx, fmt.Sprintf("INSERT INTO %s %s (%s\n)", safeSQLName(name), byNameClause, sql)) + if opts.AfterInsert != "" { + _, afterInsertExecErr := conn.ExecContext(ctx, opts.AfterInsert) + return errors.Join(err, afterInsertExecErr) + } return err }) return c.checkErr(err) diff --git a/runtime/drivers/duckdb/transporter_mysql_to_duckDB_test.go b/runtime/drivers/duckdb/transporter_mysql_to_duckDB_test.go index db8f3575e9a..4b9e74cee70 100644 --- a/runtime/drivers/duckdb/transporter_mysql_to_duckDB_test.go +++ b/runtime/drivers/duckdb/transporter_mysql_to_duckDB_test.go @@ -1,22 +1,23 @@ -package duckdb +package duckdb_test import ( "context" "database/sql" + "fmt" "testing" + "time" "github.com/rilldata/rill/runtime/drivers" "github.com/rilldata/rill/runtime/pkg/activity" "github.com/rilldata/rill/runtime/storage" "github.com/stretchr/testify/require" + "github.com/testcontainers/testcontainers-go" + "github.com/testcontainers/testcontainers-go/wait" "go.uber.org/zap" - "fmt" - "time" - _ "github.com/go-sql-driver/mysql" - "github.com/testcontainers/testcontainers-go" - "github.com/testcontainers/testcontainers-go/wait" + _ "github.com/rilldata/rill/runtime/drivers/duckdb" + _ "github.com/rilldata/rill/runtime/drivers/mysql" ) var mysqlInitStmt = ` @@ -98,6 +99,10 @@ func TestMySQLToDuckDBTransfer(t *testing.T) { t.Run("AllDataTypes", func(t *testing.T) { allMySQLDataTypesTest(t, db, dsn) }) + + t.Run("model_executor_mysql_to_duckDB", func(t *testing.T) { + mysqlToDuckDB(t, fmt.Sprintf("host=%s port=%v database=mydb user=myuser password=mypassword", host, port.Int())) + }) } func allMySQLDataTypesTest(t *testing.T, db *sql.DB, dsn string) { @@ -109,7 +114,11 @@ func allMySQLDataTypesTest(t *testing.T, db *sql.DB, dsn string) { require.NoError(t, err) olap, _ := to.AsOLAP("") - tr := newDuckDBToDuckDB(to.(*connection), "mysql", zap.NewNop()) + inputHandle, err := drivers.Open("mysql", "default", map[string]any{"dsn": dsn}, storage.MustNew(t.TempDir(), nil), activity.NewNoopClient(), zap.NewNop()) + require.NoError(t, err) + + tr, ok := to.AsTransporter(inputHandle, to) + require.True(t, ok) err = tr.Transfer(ctx, map[string]any{"sql": "select * from all_data_types_table;", "db": dsn}, map[string]any{"table": "sink"}, &drivers.TransferOptions{}) require.NoError(t, err) res, err := olap.Execute(context.Background(), &drivers.Statement{Query: "select count(*) from sink"}) @@ -123,3 +132,55 @@ func allMySQLDataTypesTest(t *testing.T, db *sql.DB, dsn string) { require.NoError(t, res.Close()) require.NoError(t, to.Close()) } + +func mysqlToDuckDB(t *testing.T, dsn string) { + duckDB, err := drivers.Open("duckdb", "default", map[string]any{"data_dir": t.TempDir()}, storage.MustNew(t.TempDir(), nil), activity.NewNoopClient(), zap.NewNop()) + require.NoError(t, err) + + inputHandle, err := drivers.Open("mysql", "default", map[string]any{"dsn": dsn}, storage.MustNew(t.TempDir(), nil), activity.NewNoopClient(), zap.NewNop()) + require.NoError(t, err) + + opts := &drivers.ModelExecutorOptions{ + InputHandle: inputHandle, + InputConnector: "mysql", + OutputHandle: duckDB, + OutputConnector: "duckdb", + Env: &drivers.ModelEnv{ + AllowHostAccess: false, + StageChanges: true, + }, + PreliminaryInputProperties: map[string]any{ + "sql": "SELECT * FROM all_data_types_table;", + "dsn": dsn, + }, + PreliminaryOutputProperties: map[string]any{ + "table": "sink", + }, + } + + me, ok := duckDB.AsModelExecutor("default", opts) + require.True(t, ok) + + execOpts := &drivers.ModelExecuteOptions{ + ModelExecutorOptions: opts, + InputProperties: opts.PreliminaryInputProperties, + OutputProperties: opts.PreliminaryOutputProperties, + } + _, err = me.Execute(context.Background(), execOpts) + require.NoError(t, err) + + olap, ok := duckDB.AsOLAP("default") + require.True(t, ok) + + res, err := olap.Execute(context.Background(), &drivers.Statement{Query: "select count(*) from sink"}) + require.NoError(t, err) + for res.Next() { + var count int + err = res.Rows.Scan(&count) + require.NoError(t, err) + require.Equal(t, 2, count) + } + require.NoError(t, res.Close()) + // TODO : verify this is a table once information_schema is fixed + require.NoError(t, duckDB.Close()) +} diff --git a/runtime/drivers/duckdb/transporter_postgres_to_duckDB_test.go b/runtime/drivers/duckdb/transporter_postgres_to_duckDB_test.go index 07615f425e9..633319696de 100644 --- a/runtime/drivers/duckdb/transporter_postgres_to_duckDB_test.go +++ b/runtime/drivers/duckdb/transporter_postgres_to_duckDB_test.go @@ -1,4 +1,4 @@ -package duckdb +package duckdb_test import ( "context" @@ -14,6 +14,7 @@ import ( // Load postgres driver _ "github.com/jackc/pgx/v5/stdlib" + _ "github.com/rilldata/rill/runtime/drivers/duckdb" _ "github.com/rilldata/rill/runtime/drivers/postgres" ) @@ -61,6 +62,7 @@ func TestTransfer(t *testing.T) { defer db.Close() t.Run("AllDataTypes", func(t *testing.T) { allDataTypesTest(t, db, pg.DatabaseURL) }) + t.Run("model_executor_postgres_to_duckDB", func(t *testing.T) { pgxToDuckDB(t, db, pg.DatabaseURL) }) } func allDataTypesTest(t *testing.T, db *sql.DB, dbURL string) { @@ -72,7 +74,12 @@ func allDataTypesTest(t *testing.T, db *sql.DB, dbURL string) { require.NoError(t, err) olap, _ := to.AsOLAP("") - tr := newDuckDBToDuckDB(to.(*connection), "postgres", zap.NewNop()) + inputHandle, err := drivers.Open("postgres", "default", map[string]any{"database_url": dbURL}, storage.MustNew(t.TempDir(), nil), activity.NewNoopClient(), zap.NewNop()) + require.NoError(t, err) + + tr, ok := to.AsTransporter(inputHandle, to) + require.True(t, ok) + err = tr.Transfer(ctx, map[string]any{"sql": "select * from all_datatypes;", "db": dbURL}, map[string]any{"table": "sink"}, &drivers.TransferOptions{}) require.NoError(t, err) res, err := olap.Execute(context.Background(), &drivers.Statement{Query: "select count(*) from sink"}) @@ -86,3 +93,80 @@ func allDataTypesTest(t *testing.T, db *sql.DB, dbURL string) { require.NoError(t, res.Close()) require.NoError(t, to.Close()) } + +func pgxToDuckDB(t *testing.T, pgdb *sql.DB, dbURL string) { + duckDB, err := drivers.Open("duckdb", "default", map[string]any{"data_dir": t.TempDir()}, storage.MustNew(t.TempDir(), nil), activity.NewNoopClient(), zap.NewNop()) + require.NoError(t, err) + + inputHandle, err := drivers.Open("postgres", "default", map[string]any{"database_url": dbURL}, storage.MustNew(t.TempDir(), nil), activity.NewNoopClient(), zap.NewNop()) + require.NoError(t, err) + + opts := &drivers.ModelExecutorOptions{ + InputHandle: inputHandle, + InputConnector: "postgres", + OutputHandle: duckDB, + OutputConnector: "duckdb", + Env: &drivers.ModelEnv{ + AllowHostAccess: false, + StageChanges: true, + }, + PreliminaryInputProperties: map[string]any{ + "sql": "SELECT * FROM all_datatypes;", + "dsn": dbURL, + }, + PreliminaryOutputProperties: map[string]any{ + "table": "sink", + }, + } + + me, ok := duckDB.AsModelExecutor("default", opts) + require.True(t, ok) + + execOpts := &drivers.ModelExecuteOptions{ + ModelExecutorOptions: opts, + InputProperties: opts.PreliminaryInputProperties, + OutputProperties: opts.PreliminaryOutputProperties, + } + + _, err = me.Execute(context.Background(), execOpts) + require.NoError(t, err) + + olap, ok := duckDB.AsOLAP("default") + require.True(t, ok) + + res, err := olap.Execute(context.Background(), &drivers.Statement{Query: "select count(*) from sink"}) + require.NoError(t, err) + for res.Next() { + var count int + err = res.Rows.Scan(&count) + require.NoError(t, err) + require.Equal(t, 1, count) + } + require.NoError(t, res.Close()) + + // ingest some more data in postges + _, err = pgdb.Exec("INSERT INTO all_datatypes(uuid, created_at) VALUES (gen_random_uuid(), '2024-01-02 12:46:55');") + require.NoError(t, err) + + // drop older data from postgres + _, err = pgdb.Exec("DELETE FROM all_datatypes WHERE created_at < '2024-01-01 00:00:00';") + require.NoError(t, err) + + // incremental run + execOpts.IncrementalRun = true + execOpts.InputProperties["sql"] = "SELECT * FROM all_datatypes WHERE created_at > '2024-01-01 00:00:00';" + _, err = me.Execute(context.Background(), execOpts) + require.NoError(t, err) + + res, err = olap.Execute(context.Background(), &drivers.Statement{Query: "select count(*) from sink"}) + require.NoError(t, err) + for res.Next() { + var count int + err = res.Rows.Scan(&count) + require.NoError(t, err) + require.Equal(t, 2, count) + } + require.NoError(t, res.Close()) + + require.NoError(t, duckDB.Close()) +} diff --git a/runtime/drivers/mysql/mysql.go b/runtime/drivers/mysql/mysql.go index 1e0a6d22947..7253ea48373 100644 --- a/runtime/drivers/mysql/mysql.go +++ b/runtime/drivers/mysql/mysql.go @@ -58,6 +58,10 @@ var spec = drivers.Spec{ type driver struct{} +type ConfigProperties struct { + DSN string `mapstructure:"dsn"` +} + func (d driver) Open(instanceID string, config map[string]any, st *storage.Client, ac *activity.Client, logger *zap.Logger) (drivers.Handle, error) { if instanceID == "" { return nil, errors.New("mysql driver can't be shared") diff --git a/runtime/drivers/postgres/postgres.go b/runtime/drivers/postgres/postgres.go index a34ce1531ba..948ae5c44cc 100644 --- a/runtime/drivers/postgres/postgres.go +++ b/runtime/drivers/postgres/postgres.go @@ -56,6 +56,18 @@ var spec = drivers.Spec{ type driver struct{} +type ConfigProperties struct { + DatabaseURL string `mapstructure:"database_url"` + DSN string `mapstructure:"dsn"` +} + +func (c *ConfigProperties) ResolveDSN() string { + if c.DSN != "" { + return c.DSN + } + return c.DatabaseURL +} + func (d driver) Open(instanceID string, config map[string]any, st *storage.Client, ac *activity.Client, logger *zap.Logger) (drivers.Handle, error) { if instanceID == "" { return nil, errors.New("postgres driver can't be shared")