Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: sqlite vacuum and optional migrations #633

Merged
merged 3 commits into from
Aug 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions cmd/waku/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,19 @@ var (
Destination: &options.Store.DatabaseURL,
EnvVars: []string{"WAKUNODE2_STORE_MESSAGE_DB_URL"},
})
StoreMessageDBVacuum = altsrc.NewBoolFlag(&cli.BoolFlag{
Name: "store-message-db-vacuum",
Usage: "Enable database vacuuming at start.",
Destination: &options.Store.Vacuum,
EnvVars: []string{"WAKUNODE2_STORE_MESSAGE_DB_VACUUM"},
})
StoreMessageDBMigration = altsrc.NewBoolFlag(&cli.BoolFlag{
Name: "store-message-db-migration",
Usage: "Enable database migration at start.",
Destination: &options.Store.Migration,
Value: true,
EnvVars: []string{"WAKUNODE2_STORE_MESSAGE_DB_MIGRATION"},
})
StoreResumePeer = cliutils.NewGenericFlagMultiValue(&cli.GenericFlag{
Name: "store-resume-peer",
Usage: "Peer multiaddress to resume the message store at boot. Option may be repeated",
Expand Down
2 changes: 2 additions & 0 deletions cmd/waku/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ func main() {
StoreMessageDBURL,
StoreMessageRetentionTime,
StoreMessageRetentionCapacity,
StoreMessageDBVacuum,
StoreMessageDBMigration,
StoreResumePeer,
FilterFlag,
FilterNode,
Expand Down
18 changes: 13 additions & 5 deletions cmd/waku/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,11 @@ func Execute(options NodeOptions) {

var db *sql.DB
var migrationFn func(*sql.DB) error
if requiresDB(options) {
db, migrationFn, err = dbutils.ExtractDBAndMigration(options.Store.DatabaseURL)
if requiresDB(options) && options.Store.Migration {
dbSettings := dbutils.DBSettings{
Vacuum: options.Store.Vacuum,
}
db, migrationFn, err = dbutils.ExtractDBAndMigration(options.Store.DatabaseURL, dbSettings, logger)
failOnErr(err, "Could not connect to DB")
}

Expand Down Expand Up @@ -211,11 +214,16 @@ func Execute(options NodeOptions) {

var dbStore *persistence.DBStore
if requiresDB(options) {
dbStore, err = persistence.NewDBStore(logger,
dbOptions := []persistence.DBOption{
persistence.WithDB(db),
persistence.WithMigrations(migrationFn), // TODO: refactor migrations out of DBStore, or merge DBStore with rendezvous DB
persistence.WithRetentionPolicy(options.Store.RetentionMaxMessages, options.Store.RetentionTime),
)
}

if options.Store.Migration {
dbOptions = append(dbOptions, persistence.WithMigrations(migrationFn)) // TODO: refactor migrations out of DBStore, or merge DBStore with rendezvous DB
}

dbStore, err = persistence.NewDBStore(logger, dbOptions...)
failOnErr(err, "DBStore")
nodeOpts = append(nodeOpts, node.WithMessageProvider(dbStore))
}
Expand Down
2 changes: 2 additions & 0 deletions cmd/waku/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ type StoreOptions struct {
RetentionMaxMessages int
ResumeNodes []multiaddr.Multiaddr
Nodes []multiaddr.Multiaddr
Vacuum bool
Migration bool
}

// DNSDiscoveryOptions are settings used for enabling DNS-based discovery
Expand Down
2 changes: 1 addition & 1 deletion cmd/waku/rest/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (

func MemoryDB(t *testing.T) *persistence.DBStore {
var db *sql.DB
db, migration, err := sqlite.NewDB(":memory:")
db, migration, err := sqlite.NewDB(":memory:", false, utils.Logger())
require.NoError(t, err)

dbStore, err := persistence.NewDBStore(utils.Logger(), persistence.WithDB(db), persistence.WithMigrations(migration))
Expand Down
2 changes: 1 addition & 1 deletion library/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ func NewNode(configJSON string) error {
if *config.EnableStore {
var db *sql.DB
var migrationFn func(*sql.DB) error
db, migrationFn, err = dbutils.ExtractDBAndMigration(*config.DatabaseURL)
db, migrationFn, err = dbutils.ExtractDBAndMigration(*config.DatabaseURL, dbutils.DBSettings{Vacuum: true}, utils.Logger())
if err != nil {
return err
}
Expand Down
22 changes: 20 additions & 2 deletions waku/persistence/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,11 @@ import (
"github.com/waku-org/go-waku/waku/persistence"
"github.com/waku-org/go-waku/waku/persistence/migrate"
"github.com/waku-org/go-waku/waku/persistence/postgres/migrations"
"go.uber.org/zap"
)

// WithDB is a DBOption that lets you use a postgresql DBStore and run migrations
func WithDB(dburl string, migrate bool) persistence.DBOption {
func WithDB(dburl string, migrate bool, shouldVacuum bool) persistence.DBOption {
return func(d *persistence.DBStore) error {
driverOption := persistence.WithDriver("pgx", dburl)
err := driverOption(d)
Expand All @@ -35,13 +36,30 @@ func WithDB(dburl string, migrate bool) persistence.DBOption {
}
}

func executeVacuum(db *sql.DB, logger *zap.Logger) error {
logger.Info("starting PostgreSQL database vacuuming")
_, err := db.Exec("VACUUM FULL")
if err != nil {
return err
}
logger.Info("finished PostgreSQL database vacuuming")
return nil
}

// NewDB connects to postgres DB in the specified path
func NewDB(dburl string) (*sql.DB, func(*sql.DB) error, error) {
func NewDB(dburl string, shouldVacuum bool, logger *zap.Logger) (*sql.DB, func(*sql.DB) error, error) {
db, err := sql.Open("pgx", dburl)
if err != nil {
return nil, nil, err
}

if shouldVacuum {
err := executeVacuum(db, logger)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just wondering, since vacuuming of db can take time upto minutes.
Should this be a bocking call or should we start this process in a go-routine and report the result back in async way?
Do we have any test indicating how much time a vacuum takes for our store db?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reading how SQLite VACUUM documentation, it will prevent writes to be executed, so it should be a blocking call.
No tests, but this command works by creating a temporary copy of the db, so the time it will take depends on the amount of data stored in the DB.

For postgresql, I modified the PR to use VACUUM FULL, just so the behavior of the command is the same as SQLite, but I think we shouldnt expect people to use vacuum with PostgreSQL since it is possible to setup routine vacuuming in it https://www.postgresql.org/docs/current/routine-vacuuming.html#AUTOVACUUM

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, maybe the caller can do an async status to the user then. Rather than us doing it.
I am just concerned since this is an API call by the user, blocking it for soo long may not be a good idea.

But its ok for now, we can change it if we get user feedback.

if err != nil {
return nil, nil, err
}
}

return db, Migrate, nil
}

Expand Down
20 changes: 19 additions & 1 deletion waku/persistence/sqlite/sqlite.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/waku-org/go-waku/waku/persistence"
"github.com/waku-org/go-waku/waku/persistence/migrate"
"github.com/waku-org/go-waku/waku/persistence/sqlite/migrations"
"go.uber.org/zap"
)

func addSqliteURLDefaults(dburl string) string {
Expand Down Expand Up @@ -55,8 +56,18 @@ func WithDB(dburl string, migrate bool) persistence.DBOption {
}
}

func executeVacuum(db *sql.DB, logger *zap.Logger) error {
logger.Info("starting sqlite database vacuuming")
_, err := db.Exec("VACUUM")
if err != nil {
return err
}
logger.Info("finished sqlite database vacuuming")
return nil
}

// NewDB creates a sqlite3 DB in the specified path
func NewDB(dburl string) (*sql.DB, func(*sql.DB) error, error) {
func NewDB(dburl string, shouldVacuum bool, logger *zap.Logger) (*sql.DB, func(*sql.DB) error, error) {
db, err := sql.Open("sqlite3", addSqliteURLDefaults(dburl))
if err != nil {
return nil, nil, err
Expand All @@ -65,6 +76,13 @@ func NewDB(dburl string) (*sql.DB, func(*sql.DB) error, error) {
// Disable concurrent access as not supported by the driver
db.SetMaxOpenConns(1)

if shouldVacuum {
err := executeVacuum(db, logger)
if err != nil {
return nil, nil, err
}
}

return db, Migrate, nil
}

Expand Down
14 changes: 11 additions & 3 deletions waku/persistence/utils/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/waku-org/go-waku/waku/persistence/postgres"
"github.com/waku-org/go-waku/waku/persistence/sqlite"
"go.uber.org/zap"
)

func validateDBUrl(val string) error {
Expand All @@ -18,12 +19,19 @@ func validateDBUrl(val string) error {
return nil
}

// DBSettings hold db specific configuration settings required during the db initialization
type DBSettings struct {
Vacuum bool
}

// ExtractDBAndMigration will return a database connection, and migration function that should be used depending on a database connection string
func ExtractDBAndMigration(databaseURL string) (*sql.DB, func(*sql.DB) error, error) {
func ExtractDBAndMigration(databaseURL string, dbSettings DBSettings, logger *zap.Logger) (*sql.DB, func(*sql.DB) error, error) {
var db *sql.DB
var migrationFn func(*sql.DB) error
var err error

logger = logger.Named("db-setup")

dbURL := ""
if databaseURL != "" {
err := validateDBUrl(databaseURL)
Expand All @@ -41,9 +49,9 @@ func ExtractDBAndMigration(databaseURL string) (*sql.DB, func(*sql.DB) error, er
dbParams := dbURLParts[1]
switch dbEngine {
case "sqlite3":
db, migrationFn, err = sqlite.NewDB(dbParams)
db, migrationFn, err = sqlite.NewDB(dbParams, dbSettings.Vacuum, logger)
case "postgresql":
db, migrationFn, err = postgres.NewDB(dbURL)
db, migrationFn, err = postgres.NewDB(dbURL, dbSettings.Vacuum, logger)
default:
err = errors.New("unsupported database engine")
}
Expand Down
2 changes: 1 addition & 1 deletion waku/v2/node/connectedness_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func TestConnectionStatusChanges(t *testing.T) {
err = node2.Start(ctx)
require.NoError(t, err)

db, migration, err := sqlite.NewDB(":memory:")
db, migration, err := sqlite.NewDB(":memory:", false, utils.Logger())
require.NoError(t, err)
dbStore, err := persistence.NewDBStore(utils.Logger(), persistence.WithDB(db), persistence.WithMigrations(migration))
require.NoError(t, err)
Expand Down
2 changes: 1 addition & 1 deletion waku/v2/node/wakunode2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ func TestDecoupledStoreFromRelay(t *testing.T) {
subs.Unsubscribe()

// NODE2: Filter Client/Store
db, migration, err := sqlite.NewDB(":memory:")
db, migration, err := sqlite.NewDB(":memory:", false, utils.Logger())
require.NoError(t, err)
dbStore, err := persistence.NewDBStore(utils.Logger(), persistence.WithDB(db), persistence.WithMigrations(migration))
require.NoError(t, err)
Expand Down
2 changes: 1 addition & 1 deletion waku/v2/protocol/store/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (

func MemoryDB(t *testing.T) *persistence.DBStore {
var db *sql.DB
db, migration, err := sqlite.NewDB(":memory:")
db, migration, err := sqlite.NewDB(":memory:", false, utils.Logger())
require.NoError(t, err)

dbStore, err := persistence.NewDBStore(utils.Logger(), persistence.WithDB(db), persistence.WithMigrations(migration))
Expand Down
2 changes: 1 addition & 1 deletion waku/v2/rendezvous/rendezvous_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func TestRendezvous(t *testing.T) {
require.NoError(t, err)

var db *sql.DB
db, migration, err := sqlite.NewDB(":memory:")
db, migration, err := sqlite.NewDB(":memory:", false, utils.Logger())
require.NoError(t, err)

err = migration(db)
Expand Down