Skip to content

Commit

Permalink
feat: sqlite vacuum and optional migrations
Browse files Browse the repository at this point in the history
  • Loading branch information
richard-ramos committed Aug 8, 2023
1 parent df78d21 commit e2b2662
Show file tree
Hide file tree
Showing 12 changed files with 59 additions and 14 deletions.
13 changes: 13 additions & 0 deletions cmd/waku/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,19 @@ var (
Destination: &options.Store.DatabaseURL,
EnvVars: []string{"WAKUNODE2_STORE_MESSAGE_DB_URL"},
})
StoreMessageDBVacuum = altsrc.NewBoolFlag(&cli.BoolFlag{
Name: "store-message-db-vacuum",
Usage: "Enable database vacuuming at start. Only supported by SQLite database engine.",
Destination: &options.Store.Vacuum,
EnvVars: []string{"WAKUNODE2_STORE_MESSAGE_DB_VACUUM"},
})
StoreMessageDBMigration = altsrc.NewBoolFlag(&cli.BoolFlag{
Name: "store-message-db-migration",
Usage: "Enable database migration at start.",
Destination: &options.Store.Migration,
Value: true,
EnvVars: []string{"WAKUNODE2_STORE_MESSAGE_DB_MIGRATION"},
})
StoreResumePeer = cliutils.NewGenericFlagMultiValue(&cli.GenericFlag{
Name: "store-resume-peer",
Usage: "Peer multiaddress to resume the message store at boot. Option may be repeated",
Expand Down
2 changes: 2 additions & 0 deletions cmd/waku/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ func main() {
StoreMessageDBURL,
StoreMessageRetentionTime,
StoreMessageRetentionCapacity,
StoreMessageDBVacuum,
StoreMessageDBMigration,
StoreResumePeer,
FilterFlag,
FilterNode,
Expand Down
18 changes: 13 additions & 5 deletions cmd/waku/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,11 @@ func Execute(options Options) {

var db *sql.DB
var migrationFn func(*sql.DB) error
if requiresDB(options) {
db, migrationFn, err = dbutils.ExtractDBAndMigration(options.Store.DatabaseURL)
if requiresDB(options) && options.Store.Migration {
dbSettings := dbutils.DBSettings{
SQLiteVacuum: options.Store.Vacuum,
}
db, migrationFn, err = dbutils.ExtractDBAndMigration(options.Store.DatabaseURL, dbSettings, logger)
failOnErr(err, "Could not connect to DB")
}

Expand Down Expand Up @@ -219,11 +222,16 @@ func Execute(options Options) {

var dbStore *persistence.DBStore
if requiresDB(options) {
dbStore, err = persistence.NewDBStore(logger,
dbOptions := []persistence.DBOption{
persistence.WithDB(db),
persistence.WithMigrations(migrationFn), // TODO: refactor migrations out of DBStore, or merge DBStore with rendezvous DB
persistence.WithRetentionPolicy(options.Store.RetentionMaxMessages, options.Store.RetentionTime),
)
}

if options.Store.Migration {
dbOptions = append(dbOptions, persistence.WithMigrations(migrationFn)) // TODO: refactor migrations out of DBStore, or merge DBStore with rendezvous DB
}

dbStore, err = persistence.NewDBStore(logger, dbOptions...)
failOnErr(err, "DBStore")
nodeOpts = append(nodeOpts, node.WithMessageProvider(dbStore))
}
Expand Down
2 changes: 2 additions & 0 deletions cmd/waku/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ type StoreOptions struct {
RetentionMaxMessages int
ResumeNodes []multiaddr.Multiaddr
Nodes []multiaddr.Multiaddr
Vacuum bool
Migration bool
}

// DNSDiscoveryOptions are settings used for enabling DNS-based discovery
Expand Down
2 changes: 1 addition & 1 deletion cmd/waku/rest/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (

func MemoryDB(t *testing.T) *persistence.DBStore {
var db *sql.DB
db, migration, err := sqlite.NewDB(":memory:")
db, migration, err := sqlite.NewDB(":memory:", false, utils.Logger())
require.NoError(t, err)

dbStore, err := persistence.NewDBStore(utils.Logger(), persistence.WithDB(db), persistence.WithMigrations(migration))
Expand Down
2 changes: 1 addition & 1 deletion mobile/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ func NewNode(configJSON string) string {
if *config.EnableStore {
var db *sql.DB
var migrationFn func(*sql.DB) error
db, migrationFn, err = dbutils.ExtractDBAndMigration(*config.DatabaseURL)
db, migrationFn, err = dbutils.ExtractDBAndMigration(*config.DatabaseURL, dbutils.DBSettings{SQLiteVacuum: true}, utils.Logger())
if err != nil {
return MakeJSONResponse(err)
}
Expand Down
14 changes: 13 additions & 1 deletion waku/persistence/sqlite/sqlite.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/waku-org/go-waku/waku/persistence"
"github.com/waku-org/go-waku/waku/persistence/migrate"
"github.com/waku-org/go-waku/waku/persistence/sqlite/migrations"
"go.uber.org/zap"
)

func addSqliteURLDefaults(dburl string) string {
Expand Down Expand Up @@ -56,7 +57,7 @@ func WithDB(dburl string, migrate bool) persistence.DBOption {
}

// NewDB creates a sqlite3 DB in the specified path
func NewDB(dburl string) (*sql.DB, func(*sql.DB) error, error) {
func NewDB(dburl string, shouldVacuum bool, logger *zap.Logger) (*sql.DB, func(*sql.DB) error, error) {
db, err := sql.Open("sqlite3", addSqliteURLDefaults(dburl))
if err != nil {
return nil, nil, err
Expand All @@ -65,6 +66,17 @@ func NewDB(dburl string) (*sql.DB, func(*sql.DB) error, error) {
// Disable concurrent access as not supported by the driver
db.SetMaxOpenConns(1)

logger.Info("starting sqlite database vacuuming")

if shouldVacuum {
_, err := db.Exec("VACUUM")
if err != nil {
return nil, nil, err
}
}

logger.Info("finished sqlite database vacuuming")

return db, Migrate, nil
}

Expand Down
12 changes: 10 additions & 2 deletions waku/persistence/utils/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/waku-org/go-waku/waku/persistence/postgres"
"github.com/waku-org/go-waku/waku/persistence/sqlite"
"go.uber.org/zap"
)

func validateDBUrl(val string) error {
Expand All @@ -18,12 +19,19 @@ func validateDBUrl(val string) error {
return nil
}

// DBSettings hold db specific configuration settings required during the db initialization
type DBSettings struct {
SQLiteVacuum bool
}

// ExtractDBAndMigration will return a database connection, and migration function that should be used depending on a database connection string
func ExtractDBAndMigration(databaseURL string) (*sql.DB, func(*sql.DB) error, error) {
func ExtractDBAndMigration(databaseURL string, dbSettings DBSettings, logger *zap.Logger) (*sql.DB, func(*sql.DB) error, error) {
var db *sql.DB
var migrationFn func(*sql.DB) error
var err error

logger = logger.Named("db-setup")

dbURL := ""
if databaseURL != "" {
err := validateDBUrl(databaseURL)
Expand All @@ -41,7 +49,7 @@ func ExtractDBAndMigration(databaseURL string) (*sql.DB, func(*sql.DB) error, er
dbParams := dbURLParts[1]
switch dbEngine {
case "sqlite3":
db, migrationFn, err = sqlite.NewDB(dbParams)
db, migrationFn, err = sqlite.NewDB(dbParams, dbSettings.SQLiteVacuum, logger)
case "postgresql":
db, migrationFn, err = postgres.NewDB(dbURL)
default:
Expand Down
2 changes: 1 addition & 1 deletion waku/v2/node/connectedness_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func TestConnectionStatusChanges(t *testing.T) {
err = node2.Start(ctx)
require.NoError(t, err)

db, migration, err := sqlite.NewDB(":memory:")
db, migration, err := sqlite.NewDB(":memory:", false, utils.Logger())
require.NoError(t, err)
dbStore, err := persistence.NewDBStore(utils.Logger(), persistence.WithDB(db), persistence.WithMigrations(migration))
require.NoError(t, err)
Expand Down
2 changes: 1 addition & 1 deletion waku/v2/node/wakunode2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ func TestDecoupledStoreFromRelay(t *testing.T) {
subs.Unsubscribe()

// NODE2: Filter Client/Store
db, migration, err := sqlite.NewDB(":memory:")
db, migration, err := sqlite.NewDB(":memory:", false, utils.Logger())
require.NoError(t, err)
dbStore, err := persistence.NewDBStore(utils.Logger(), persistence.WithDB(db), persistence.WithMigrations(migration))
require.NoError(t, err)
Expand Down
2 changes: 1 addition & 1 deletion waku/v2/protocol/store/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (

func MemoryDB(t *testing.T) *persistence.DBStore {
var db *sql.DB
db, migration, err := sqlite.NewDB(":memory:")
db, migration, err := sqlite.NewDB(":memory:", false, utils.Logger())
require.NoError(t, err)

dbStore, err := persistence.NewDBStore(utils.Logger(), persistence.WithDB(db), persistence.WithMigrations(migration))
Expand Down
2 changes: 1 addition & 1 deletion waku/v2/rendezvous/rendezvous_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func TestRendezvous(t *testing.T) {
require.NoError(t, err)

var db *sql.DB
db, migration, err := sqlite.NewDB(":memory:")
db, migration, err := sqlite.NewDB(":memory:", false, utils.Logger())
require.NoError(t, err)

err = migration(db)
Expand Down

0 comments on commit e2b2662

Please sign in to comment.