From fdad4369d44c2fe66a17940bf015f36bdbaf5ca0 Mon Sep 17 00:00:00 2001 From: Richard Ramos Date: Tue, 8 Aug 2023 11:46:32 -0400 Subject: [PATCH 1/3] feat: sqlite vacuum and optional migrations --- cmd/waku/flags.go | 13 +++++++++++++ cmd/waku/main.go | 2 ++ cmd/waku/node.go | 18 +++++++++++++----- cmd/waku/options.go | 2 ++ cmd/waku/rest/utils_test.go | 2 +- library/node.go | 2 +- waku/persistence/sqlite/sqlite.go | 14 +++++++++++++- waku/persistence/utils/db.go | 12 ++++++++++-- waku/v2/node/connectedness_test.go | 2 +- waku/v2/node/wakunode2_test.go | 2 +- waku/v2/protocol/store/utils_test.go | 2 +- waku/v2/rendezvous/rendezvous_test.go | 2 +- 12 files changed, 59 insertions(+), 14 deletions(-) diff --git a/cmd/waku/flags.go b/cmd/waku/flags.go index cef87e8cd..81ea16758 100644 --- a/cmd/waku/flags.go +++ b/cmd/waku/flags.go @@ -288,6 +288,19 @@ var ( Destination: &options.Store.DatabaseURL, EnvVars: []string{"WAKUNODE2_STORE_MESSAGE_DB_URL"}, }) + StoreMessageDBVacuum = altsrc.NewBoolFlag(&cli.BoolFlag{ + Name: "store-message-db-vacuum", + Usage: "Enable database vacuuming at start. Only supported by SQLite database engine.", + Destination: &options.Store.Vacuum, + EnvVars: []string{"WAKUNODE2_STORE_MESSAGE_DB_VACUUM"}, + }) + StoreMessageDBMigration = altsrc.NewBoolFlag(&cli.BoolFlag{ + Name: "store-message-db-migration", + Usage: "Enable database migration at start.", + Destination: &options.Store.Migration, + Value: true, + EnvVars: []string{"WAKUNODE2_STORE_MESSAGE_DB_MIGRATION"}, + }) StoreResumePeer = cliutils.NewGenericFlagMultiValue(&cli.GenericFlag{ Name: "store-resume-peer", Usage: "Peer multiaddress to resume the message store at boot. Option may be repeated", diff --git a/cmd/waku/main.go b/cmd/waku/main.go index 805e620d5..b84c66a92 100644 --- a/cmd/waku/main.go +++ b/cmd/waku/main.go @@ -56,6 +56,8 @@ func main() { StoreMessageDBURL, StoreMessageRetentionTime, StoreMessageRetentionCapacity, + StoreMessageDBVacuum, + StoreMessageDBMigration, StoreResumePeer, FilterFlag, FilterNode, diff --git a/cmd/waku/node.go b/cmd/waku/node.go index 6035353c1..f9d779db4 100644 --- a/cmd/waku/node.go +++ b/cmd/waku/node.go @@ -96,8 +96,11 @@ func Execute(options NodeOptions) { var db *sql.DB var migrationFn func(*sql.DB) error - if requiresDB(options) { - db, migrationFn, err = dbutils.ExtractDBAndMigration(options.Store.DatabaseURL) + if requiresDB(options) && options.Store.Migration { + dbSettings := dbutils.DBSettings{ + SQLiteVacuum: options.Store.Vacuum, + } + db, migrationFn, err = dbutils.ExtractDBAndMigration(options.Store.DatabaseURL, dbSettings, logger) failOnErr(err, "Could not connect to DB") } @@ -211,11 +214,16 @@ func Execute(options NodeOptions) { var dbStore *persistence.DBStore if requiresDB(options) { - dbStore, err = persistence.NewDBStore(logger, + dbOptions := []persistence.DBOption{ persistence.WithDB(db), - persistence.WithMigrations(migrationFn), // TODO: refactor migrations out of DBStore, or merge DBStore with rendezvous DB persistence.WithRetentionPolicy(options.Store.RetentionMaxMessages, options.Store.RetentionTime), - ) + } + + if options.Store.Migration { + dbOptions = append(dbOptions, persistence.WithMigrations(migrationFn)) // TODO: refactor migrations out of DBStore, or merge DBStore with rendezvous DB + } + + dbStore, err = persistence.NewDBStore(logger, dbOptions...) failOnErr(err, "DBStore") nodeOpts = append(nodeOpts, node.WithMessageProvider(dbStore)) } diff --git a/cmd/waku/options.go b/cmd/waku/options.go index c5a23e611..d21a99301 100644 --- a/cmd/waku/options.go +++ b/cmd/waku/options.go @@ -81,6 +81,8 @@ type StoreOptions struct { RetentionMaxMessages int ResumeNodes []multiaddr.Multiaddr Nodes []multiaddr.Multiaddr + Vacuum bool + Migration bool } // DNSDiscoveryOptions are settings used for enabling DNS-based discovery diff --git a/cmd/waku/rest/utils_test.go b/cmd/waku/rest/utils_test.go index fe3fdd185..020eccb46 100644 --- a/cmd/waku/rest/utils_test.go +++ b/cmd/waku/rest/utils_test.go @@ -12,7 +12,7 @@ import ( func MemoryDB(t *testing.T) *persistence.DBStore { var db *sql.DB - db, migration, err := sqlite.NewDB(":memory:") + db, migration, err := sqlite.NewDB(":memory:", false, utils.Logger()) require.NoError(t, err) dbStore, err := persistence.NewDBStore(utils.Logger(), persistence.WithDB(db), persistence.WithMigrations(migration)) diff --git a/library/node.go b/library/node.go index e8352c9a2..fb1accfd7 100644 --- a/library/node.go +++ b/library/node.go @@ -131,7 +131,7 @@ func NewNode(configJSON string) error { if *config.EnableStore { var db *sql.DB var migrationFn func(*sql.DB) error - db, migrationFn, err = dbutils.ExtractDBAndMigration(*config.DatabaseURL) + db, migrationFn, err = dbutils.ExtractDBAndMigration(*config.DatabaseURL, dbutils.DBSettings{SQLiteVacuum: true}, utils.Logger()) if err != nil { return err } diff --git a/waku/persistence/sqlite/sqlite.go b/waku/persistence/sqlite/sqlite.go index 947e9d56f..675d7ee13 100644 --- a/waku/persistence/sqlite/sqlite.go +++ b/waku/persistence/sqlite/sqlite.go @@ -11,6 +11,7 @@ import ( "github.com/waku-org/go-waku/waku/persistence" "github.com/waku-org/go-waku/waku/persistence/migrate" "github.com/waku-org/go-waku/waku/persistence/sqlite/migrations" + "go.uber.org/zap" ) func addSqliteURLDefaults(dburl string) string { @@ -56,7 +57,7 @@ func WithDB(dburl string, migrate bool) persistence.DBOption { } // NewDB creates a sqlite3 DB in the specified path -func NewDB(dburl string) (*sql.DB, func(*sql.DB) error, error) { +func NewDB(dburl string, shouldVacuum bool, logger *zap.Logger) (*sql.DB, func(*sql.DB) error, error) { db, err := sql.Open("sqlite3", addSqliteURLDefaults(dburl)) if err != nil { return nil, nil, err @@ -65,6 +66,17 @@ func NewDB(dburl string) (*sql.DB, func(*sql.DB) error, error) { // Disable concurrent access as not supported by the driver db.SetMaxOpenConns(1) + logger.Info("starting sqlite database vacuuming") + + if shouldVacuum { + _, err := db.Exec("VACUUM") + if err != nil { + return nil, nil, err + } + } + + logger.Info("finished sqlite database vacuuming") + return db, Migrate, nil } diff --git a/waku/persistence/utils/db.go b/waku/persistence/utils/db.go index 5d58ee622..2832cf990 100644 --- a/waku/persistence/utils/db.go +++ b/waku/persistence/utils/db.go @@ -8,6 +8,7 @@ import ( "github.com/waku-org/go-waku/waku/persistence/postgres" "github.com/waku-org/go-waku/waku/persistence/sqlite" + "go.uber.org/zap" ) func validateDBUrl(val string) error { @@ -18,12 +19,19 @@ func validateDBUrl(val string) error { return nil } +// DBSettings hold db specific configuration settings required during the db initialization +type DBSettings struct { + SQLiteVacuum bool +} + // ExtractDBAndMigration will return a database connection, and migration function that should be used depending on a database connection string -func ExtractDBAndMigration(databaseURL string) (*sql.DB, func(*sql.DB) error, error) { +func ExtractDBAndMigration(databaseURL string, dbSettings DBSettings, logger *zap.Logger) (*sql.DB, func(*sql.DB) error, error) { var db *sql.DB var migrationFn func(*sql.DB) error var err error + logger = logger.Named("db-setup") + dbURL := "" if databaseURL != "" { err := validateDBUrl(databaseURL) @@ -41,7 +49,7 @@ func ExtractDBAndMigration(databaseURL string) (*sql.DB, func(*sql.DB) error, er dbParams := dbURLParts[1] switch dbEngine { case "sqlite3": - db, migrationFn, err = sqlite.NewDB(dbParams) + db, migrationFn, err = sqlite.NewDB(dbParams, dbSettings.SQLiteVacuum, logger) case "postgresql": db, migrationFn, err = postgres.NewDB(dbURL) default: diff --git a/waku/v2/node/connectedness_test.go b/waku/v2/node/connectedness_test.go index a1b8a57f5..3a1998196 100644 --- a/waku/v2/node/connectedness_test.go +++ b/waku/v2/node/connectedness_test.go @@ -69,7 +69,7 @@ func TestConnectionStatusChanges(t *testing.T) { err = node2.Start(ctx) require.NoError(t, err) - db, migration, err := sqlite.NewDB(":memory:") + db, migration, err := sqlite.NewDB(":memory:", false, utils.Logger()) require.NoError(t, err) dbStore, err := persistence.NewDBStore(utils.Logger(), persistence.WithDB(db), persistence.WithMigrations(migration)) require.NoError(t, err) diff --git a/waku/v2/node/wakunode2_test.go b/waku/v2/node/wakunode2_test.go index b7c48a150..a38cd9789 100644 --- a/waku/v2/node/wakunode2_test.go +++ b/waku/v2/node/wakunode2_test.go @@ -230,7 +230,7 @@ func TestDecoupledStoreFromRelay(t *testing.T) { subs.Unsubscribe() // NODE2: Filter Client/Store - db, migration, err := sqlite.NewDB(":memory:") + db, migration, err := sqlite.NewDB(":memory:", false, utils.Logger()) require.NoError(t, err) dbStore, err := persistence.NewDBStore(utils.Logger(), persistence.WithDB(db), persistence.WithMigrations(migration)) require.NoError(t, err) diff --git a/waku/v2/protocol/store/utils_test.go b/waku/v2/protocol/store/utils_test.go index 24121b3c6..b9914bf4a 100644 --- a/waku/v2/protocol/store/utils_test.go +++ b/waku/v2/protocol/store/utils_test.go @@ -12,7 +12,7 @@ import ( func MemoryDB(t *testing.T) *persistence.DBStore { var db *sql.DB - db, migration, err := sqlite.NewDB(":memory:") + db, migration, err := sqlite.NewDB(":memory:", false, utils.Logger()) require.NoError(t, err) dbStore, err := persistence.NewDBStore(utils.Logger(), persistence.WithDB(db), persistence.WithMigrations(migration)) diff --git a/waku/v2/rendezvous/rendezvous_test.go b/waku/v2/rendezvous/rendezvous_test.go index 671d5a095..b0350730e 100644 --- a/waku/v2/rendezvous/rendezvous_test.go +++ b/waku/v2/rendezvous/rendezvous_test.go @@ -47,7 +47,7 @@ func TestRendezvous(t *testing.T) { require.NoError(t, err) var db *sql.DB - db, migration, err := sqlite.NewDB(":memory:") + db, migration, err := sqlite.NewDB(":memory:", false, utils.Logger()) require.NoError(t, err) err = migration(db) From 8188f18fb937f9b951ec6335592ede4e988e8185 Mon Sep 17 00:00:00 2001 From: Richard Ramos Date: Wed, 9 Aug 2023 13:14:02 -0400 Subject: [PATCH 2/3] feat: postgresql vacuum --- cmd/waku/flags.go | 2 +- cmd/waku/node.go | 2 +- library/node.go | 2 +- waku/persistence/postgres/postgres.go | 22 ++++++++++++++++++++-- waku/persistence/sqlite/sqlite.go | 16 +++++++++++----- waku/persistence/utils/db.go | 6 +++--- 6 files changed, 37 insertions(+), 13 deletions(-) diff --git a/cmd/waku/flags.go b/cmd/waku/flags.go index 81ea16758..053d6d9b1 100644 --- a/cmd/waku/flags.go +++ b/cmd/waku/flags.go @@ -290,7 +290,7 @@ var ( }) StoreMessageDBVacuum = altsrc.NewBoolFlag(&cli.BoolFlag{ Name: "store-message-db-vacuum", - Usage: "Enable database vacuuming at start. Only supported by SQLite database engine.", + Usage: "Enable database vacuuming at start.", Destination: &options.Store.Vacuum, EnvVars: []string{"WAKUNODE2_STORE_MESSAGE_DB_VACUUM"}, }) diff --git a/cmd/waku/node.go b/cmd/waku/node.go index f9d779db4..ccfbd3b8d 100644 --- a/cmd/waku/node.go +++ b/cmd/waku/node.go @@ -98,7 +98,7 @@ func Execute(options NodeOptions) { var migrationFn func(*sql.DB) error if requiresDB(options) && options.Store.Migration { dbSettings := dbutils.DBSettings{ - SQLiteVacuum: options.Store.Vacuum, + Vacuum: options.Store.Vacuum, } db, migrationFn, err = dbutils.ExtractDBAndMigration(options.Store.DatabaseURL, dbSettings, logger) failOnErr(err, "Could not connect to DB") diff --git a/library/node.go b/library/node.go index fb1accfd7..74aef9742 100644 --- a/library/node.go +++ b/library/node.go @@ -131,7 +131,7 @@ func NewNode(configJSON string) error { if *config.EnableStore { var db *sql.DB var migrationFn func(*sql.DB) error - db, migrationFn, err = dbutils.ExtractDBAndMigration(*config.DatabaseURL, dbutils.DBSettings{SQLiteVacuum: true}, utils.Logger()) + db, migrationFn, err = dbutils.ExtractDBAndMigration(*config.DatabaseURL, dbutils.DBSettings{Vacuum: true}, utils.Logger()) if err != nil { return err } diff --git a/waku/persistence/postgres/postgres.go b/waku/persistence/postgres/postgres.go index f87ff141d..49ffd671d 100644 --- a/waku/persistence/postgres/postgres.go +++ b/waku/persistence/postgres/postgres.go @@ -10,10 +10,11 @@ import ( "github.com/waku-org/go-waku/waku/persistence" "github.com/waku-org/go-waku/waku/persistence/migrate" "github.com/waku-org/go-waku/waku/persistence/postgres/migrations" + "go.uber.org/zap" ) // WithDB is a DBOption that lets you use a postgresql DBStore and run migrations -func WithDB(dburl string, migrate bool) persistence.DBOption { +func WithDB(dburl string, migrate bool, shouldVacuum bool) persistence.DBOption { return func(d *persistence.DBStore) error { driverOption := persistence.WithDriver("pgx", dburl) err := driverOption(d) @@ -35,13 +36,30 @@ func WithDB(dburl string, migrate bool) persistence.DBOption { } } +func executeVacuum(db *sql.DB, logger *zap.Logger) error { + logger.Info("starting PostgreSQL database vacuuming") + _, err := db.Exec("VACUUM") + if err != nil { + return err + } + logger.Info("finished PostgreSQL database vacuuming") + return nil +} + // NewDB connects to postgres DB in the specified path -func NewDB(dburl string) (*sql.DB, func(*sql.DB) error, error) { +func NewDB(dburl string, shouldVacuum bool, logger *zap.Logger) (*sql.DB, func(*sql.DB) error, error) { db, err := sql.Open("pgx", dburl) if err != nil { return nil, nil, err } + if shouldVacuum { + err := executeVacuum(db, logger) + if err != nil { + return nil, nil, err + } + } + return db, Migrate, nil } diff --git a/waku/persistence/sqlite/sqlite.go b/waku/persistence/sqlite/sqlite.go index 675d7ee13..517dbcaee 100644 --- a/waku/persistence/sqlite/sqlite.go +++ b/waku/persistence/sqlite/sqlite.go @@ -56,6 +56,16 @@ func WithDB(dburl string, migrate bool) persistence.DBOption { } } +func executeVacuum(db *sql.DB, logger *zap.Logger) error { + logger.Info("starting sqlite database vacuuming") + _, err := db.Exec("VACUUM") + if err != nil { + return err + } + logger.Info("finished sqlite database vacuuming") + return nil +} + // NewDB creates a sqlite3 DB in the specified path func NewDB(dburl string, shouldVacuum bool, logger *zap.Logger) (*sql.DB, func(*sql.DB) error, error) { db, err := sql.Open("sqlite3", addSqliteURLDefaults(dburl)) @@ -66,17 +76,13 @@ func NewDB(dburl string, shouldVacuum bool, logger *zap.Logger) (*sql.DB, func(* // Disable concurrent access as not supported by the driver db.SetMaxOpenConns(1) - logger.Info("starting sqlite database vacuuming") - if shouldVacuum { - _, err := db.Exec("VACUUM") + err := executeVacuum(db, logger) if err != nil { return nil, nil, err } } - logger.Info("finished sqlite database vacuuming") - return db, Migrate, nil } diff --git a/waku/persistence/utils/db.go b/waku/persistence/utils/db.go index 2832cf990..94593258b 100644 --- a/waku/persistence/utils/db.go +++ b/waku/persistence/utils/db.go @@ -21,7 +21,7 @@ func validateDBUrl(val string) error { // DBSettings hold db specific configuration settings required during the db initialization type DBSettings struct { - SQLiteVacuum bool + Vacuum bool } // ExtractDBAndMigration will return a database connection, and migration function that should be used depending on a database connection string @@ -49,9 +49,9 @@ func ExtractDBAndMigration(databaseURL string, dbSettings DBSettings, logger *za dbParams := dbURLParts[1] switch dbEngine { case "sqlite3": - db, migrationFn, err = sqlite.NewDB(dbParams, dbSettings.SQLiteVacuum, logger) + db, migrationFn, err = sqlite.NewDB(dbParams, dbSettings.Vacuum, logger) case "postgresql": - db, migrationFn, err = postgres.NewDB(dbURL) + db, migrationFn, err = postgres.NewDB(dbURL, dbSettings.Vacuum, logger) default: err = errors.New("unsupported database engine") } From d794146c535581015d626ad2632c500137b1e5a6 Mon Sep 17 00:00:00 2001 From: Richard Ramos Date: Thu, 10 Aug 2023 09:40:08 -0400 Subject: [PATCH 3/3] fix: postgresql VACUUM FULL is the equivalent to sqlite VACUUM --- waku/persistence/postgres/postgres.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/waku/persistence/postgres/postgres.go b/waku/persistence/postgres/postgres.go index 49ffd671d..7ce66f962 100644 --- a/waku/persistence/postgres/postgres.go +++ b/waku/persistence/postgres/postgres.go @@ -38,7 +38,7 @@ func WithDB(dburl string, migrate bool, shouldVacuum bool) persistence.DBOption func executeVacuum(db *sql.DB, logger *zap.Logger) error { logger.Info("starting PostgreSQL database vacuuming") - _, err := db.Exec("VACUUM") + _, err := db.Exec("VACUUM FULL") if err != nil { return err }