From 5f70e1667270ea69bd2ecb471c66f57a01b42aa7 Mon Sep 17 00:00:00 2001 From: Alvar Penning Date: Wed, 28 Feb 2024 10:11:32 +0100 Subject: [PATCH] History Sync: Use single, distinct DB connection The only database tables with foreign key constraints are found in the history context. By default - and for good reasons - a DB internally consists of a cluster of several database connections. However, if a multi-master database server setup is used, it can - and will - happen that this DB contains sessions to different servers and related queries will be submitted in the wrong order, at least in the eye of some database servers. To prevent this problem, an extra DB with only one connection is used for the history sync. First, this required some refactoring on icingadb.DB, allowing to clone, copy or re-create a DB. --- cmd/icingadb/main.go | 5 ++++- pkg/config/database.go | 25 ++++++++++++------------ pkg/icingadb/db.go | 37 ++++++++++++++++++++++++++++++++---- pkg/icingadb/history/sync.go | 21 ++++++++++++++++++-- 4 files changed, 68 insertions(+), 20 deletions(-) diff --git a/cmd/icingadb/main.go b/cmd/icingadb/main.go index 1c22afc79..15b82f58b 100644 --- a/cmd/icingadb/main.go +++ b/cmd/icingadb/main.go @@ -132,7 +132,10 @@ func run() int { cancelCtx() }() s := icingadb.NewSync(db, rc, logs.GetChildLogger("config-sync")) - hs := history.NewSync(db, rc, logs.GetChildLogger("history-sync")) + hs, err := history.NewSync(db, rc, logs.GetChildLogger("history-sync")) + if err != nil { + logger.Fatalf("%+v", errors.Wrap(err, "can't create history sync")) + } rt := icingadb.NewRuntimeUpdates(db, rc, logs.GetChildLogger("runtime-updates")) ods := overdue.NewSync(db, rc, logs.GetChildLogger("overdue-sync")) ret := history.NewRetention( diff --git a/pkg/config/database.go b/pkg/config/database.go index b42ff8e1c..7be48a1aa 100644 --- a/pkg/config/database.go +++ b/pkg/config/database.go @@ -128,19 +128,18 @@ func (d *Database) Open(logger *logging.Logger) (*icingadb.DB, error) { return nil, unknownDbType(d.Type) } - db, err := sqlx.Open("icingadb-"+d.Type, dsn) - if err != nil { - return nil, errors.Wrap(err, "can't open database") - } - - db.SetMaxIdleConns(d.Options.MaxConnections / 3) - db.SetMaxOpenConns(d.Options.MaxConnections) - - db.Mapper = reflectx.NewMapperFunc("db", func(s string) string { - return utils.Key(s, '_') - }) - - return icingadb.NewDb(db, logger, &d.Options), nil + return icingadb.NewDb( + "icingadb-"+d.Type, dsn, + func(db *sqlx.DB) { + db.SetMaxIdleConns(d.Options.MaxConnections / 3) + db.SetMaxOpenConns(d.Options.MaxConnections) + + db.Mapper = reflectx.NewMapperFunc("db", func(s string) string { + return utils.Key(s, '_') + }) + }, + &d.Options, + logger) } // Validate checks constraints in the supplied database configuration and returns an error if they are violated. diff --git a/pkg/icingadb/db.go b/pkg/icingadb/db.go index 4ff3e0dde..e476f623a 100644 --- a/pkg/icingadb/db.go +++ b/pkg/icingadb/db.go @@ -32,6 +32,10 @@ type DB struct { logger *logging.Logger tableSemaphores map[string]*semaphore.Weighted tableSemaphoresMu sync.Mutex + + driverName string + dataSourceName string + dbConfFunc func(db *sqlx.DB) } // Options define user configurable database options. @@ -74,14 +78,39 @@ func (o *Options) Validate() error { return nil } -// NewDb returns a new icingadb.DB wrapper for a pre-existing *sqlx.DB. -func NewDb(db *sqlx.DB, logger *logging.Logger, options *Options) *DB { +// NewDb returns a new icingadb.DB wrapper around a *sqlx.DB. +func NewDb( + driverName, dataSourceName string, + dbConfFunc func(db *sqlx.DB), + options *Options, + logger *logging.Logger, +) (*DB, error) { + db, err := sqlx.Open(driverName, dataSourceName) + if err != nil { + return nil, fmt.Errorf("can't open database: %w", err) + } + + dbConfFunc(db) + return &DB{ DB: db, - logger: logger, Options: options, + logger: logger, tableSemaphores: make(map[string]*semaphore.Weighted), - } + driverName: driverName, + dataSourceName: dataSourceName, + dbConfFunc: dbConfFunc, + }, nil +} + +// Copy this icingadb.DB into a new, independent icingadb.DB instance. +func (db *DB) Copy() (*DB, error) { + opts := *db.Options + return NewDb( + db.driverName, db.dataSourceName, + db.dbConfFunc, + &opts, + db.logger) } const ( diff --git a/pkg/icingadb/history/sync.go b/pkg/icingadb/history/sync.go index dc8bc6117..9af33f726 100644 --- a/pkg/icingadb/history/sync.go +++ b/pkg/icingadb/history/sync.go @@ -2,6 +2,7 @@ package history import ( "context" + "fmt" "github.com/go-redis/redis/v8" "github.com/icinga/icingadb/internal" "github.com/icinga/icingadb/pkg/com" @@ -30,16 +31,32 @@ type Sync struct { } // NewSync creates a new Sync. -func NewSync(db *icingadb.DB, redis *icingaredis.Client, logger *logging.Logger) *Sync { +func NewSync(db *icingadb.DB, redis *icingaredis.Client, logger *logging.Logger) (*Sync, error) { + // The only database tables with foreign key constraints are found in the history context. By default - and for good + // reasons - a DB internally consists of a cluster of several database connections. However, if a multi-master + // database server setup is used, it can - and will - happen that this DB contains sessions to different servers and + // related queries will be submitted in the wrong order, at least in the eye of some database servers. + // + // To prevent this problem, an extra DB with only one connection is used for the history sync. + db, err := db.Copy() + if err != nil { + return nil, fmt.Errorf("can't copy DB: %w", err) + } + + db.SetMaxIdleConns(1) + db.SetMaxOpenConns(1) + return &Sync{ db: db, redis: redis, logger: logger, - } + }, nil } // Sync synchronizes Redis history streams from s.redis to s.db and deletes the original data on success. func (s Sync) Sync(ctx context.Context) error { + defer func() { _ = s.db.Close() }() + g, ctx := errgroup.WithContext(ctx) for key, pipeline := range syncPipelines {