diff --git a/cmd/icingadb/main.go b/cmd/icingadb/main.go index 1c22afc79..15b82f58b 100644 --- a/cmd/icingadb/main.go +++ b/cmd/icingadb/main.go @@ -132,7 +132,10 @@ func run() int { cancelCtx() }() s := icingadb.NewSync(db, rc, logs.GetChildLogger("config-sync")) - hs := history.NewSync(db, rc, logs.GetChildLogger("history-sync")) + hs, err := history.NewSync(db, rc, logs.GetChildLogger("history-sync")) + if err != nil { + logger.Fatalf("%+v", errors.Wrap(err, "can't create history sync")) + } rt := icingadb.NewRuntimeUpdates(db, rc, logs.GetChildLogger("runtime-updates")) ods := overdue.NewSync(db, rc, logs.GetChildLogger("overdue-sync")) ret := history.NewRetention( diff --git a/pkg/config/database.go b/pkg/config/database.go index b42ff8e1c..7be48a1aa 100644 --- a/pkg/config/database.go +++ b/pkg/config/database.go @@ -128,19 +128,18 @@ func (d *Database) Open(logger *logging.Logger) (*icingadb.DB, error) { return nil, unknownDbType(d.Type) } - db, err := sqlx.Open("icingadb-"+d.Type, dsn) - if err != nil { - return nil, errors.Wrap(err, "can't open database") - } - - db.SetMaxIdleConns(d.Options.MaxConnections / 3) - db.SetMaxOpenConns(d.Options.MaxConnections) - - db.Mapper = reflectx.NewMapperFunc("db", func(s string) string { - return utils.Key(s, '_') - }) - - return icingadb.NewDb(db, logger, &d.Options), nil + return icingadb.NewDb( + "icingadb-"+d.Type, dsn, + func(db *sqlx.DB) { + db.SetMaxIdleConns(d.Options.MaxConnections / 3) + db.SetMaxOpenConns(d.Options.MaxConnections) + + db.Mapper = reflectx.NewMapperFunc("db", func(s string) string { + return utils.Key(s, '_') + }) + }, + &d.Options, + logger) } // Validate checks constraints in the supplied database configuration and returns an error if they are violated. diff --git a/pkg/icingadb/db.go b/pkg/icingadb/db.go index 4ff3e0dde..e476f623a 100644 --- a/pkg/icingadb/db.go +++ b/pkg/icingadb/db.go @@ -32,6 +32,10 @@ type DB struct { logger *logging.Logger tableSemaphores map[string]*semaphore.Weighted tableSemaphoresMu sync.Mutex + + driverName string + dataSourceName string + dbConfFunc func(db *sqlx.DB) } // Options define user configurable database options. @@ -74,14 +78,39 @@ func (o *Options) Validate() error { return nil } -// NewDb returns a new icingadb.DB wrapper for a pre-existing *sqlx.DB. -func NewDb(db *sqlx.DB, logger *logging.Logger, options *Options) *DB { +// NewDb returns a new icingadb.DB wrapper around a *sqlx.DB. +func NewDb( + driverName, dataSourceName string, + dbConfFunc func(db *sqlx.DB), + options *Options, + logger *logging.Logger, +) (*DB, error) { + db, err := sqlx.Open(driverName, dataSourceName) + if err != nil { + return nil, fmt.Errorf("can't open database: %w", err) + } + + dbConfFunc(db) + return &DB{ DB: db, - logger: logger, Options: options, + logger: logger, tableSemaphores: make(map[string]*semaphore.Weighted), - } + driverName: driverName, + dataSourceName: dataSourceName, + dbConfFunc: dbConfFunc, + }, nil +} + +// Copy this icingadb.DB into a new, independent icingadb.DB instance. +func (db *DB) Copy() (*DB, error) { + opts := *db.Options + return NewDb( + db.driverName, db.dataSourceName, + db.dbConfFunc, + &opts, + db.logger) } const ( diff --git a/pkg/icingadb/history/sync.go b/pkg/icingadb/history/sync.go index dc8bc6117..9af33f726 100644 --- a/pkg/icingadb/history/sync.go +++ b/pkg/icingadb/history/sync.go @@ -2,6 +2,7 @@ package history import ( "context" + "fmt" "github.com/go-redis/redis/v8" "github.com/icinga/icingadb/internal" "github.com/icinga/icingadb/pkg/com" @@ -30,16 +31,32 @@ type Sync struct { } // NewSync creates a new Sync. -func NewSync(db *icingadb.DB, redis *icingaredis.Client, logger *logging.Logger) *Sync { +func NewSync(db *icingadb.DB, redis *icingaredis.Client, logger *logging.Logger) (*Sync, error) { + // The only database tables with foreign key constraints are found in the history context. By default - and for good + // reasons - a DB internally consists of a cluster of several database connections. However, if a multi-master + // database server setup is used, it can - and will - happen that this DB contains sessions to different servers and + // related queries will be submitted in the wrong order, at least in the eye of some database servers. + // + // To prevent this problem, an extra DB with only one connection is used for the history sync. + db, err := db.Copy() + if err != nil { + return nil, fmt.Errorf("can't copy DB: %w", err) + } + + db.SetMaxIdleConns(1) + db.SetMaxOpenConns(1) + return &Sync{ db: db, redis: redis, logger: logger, - } + }, nil } // Sync synchronizes Redis history streams from s.redis to s.db and deletes the original data on success. func (s Sync) Sync(ctx context.Context) error { + defer func() { _ = s.db.Close() }() + g, ctx := errgroup.WithContext(ctx) for key, pipeline := range syncPipelines {