Skip to content

Commit

Permalink
History Sync: Use single, distinct DB connection
Browse files Browse the repository at this point in the history
The only database tables with foreign key constraints are found in the
history context. By default - and for good reasons - a DB internally
consists of a cluster of several database connections. However, if a
multi-master database server setup is used, it can - and will - happen
that this DB contains sessions to different servers and related queries
will be submitted in the wrong order, at least in the eye of some
database servers.

To prevent this problem, an extra DB with only one connection is used
for the history sync. First, this required some refactoring on
icingadb.DB, allowing to clone, copy or re-create a DB.
  • Loading branch information
oxzi committed Feb 28, 2024
1 parent 0e9810c commit 5f70e16
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 20 deletions.
5 changes: 4 additions & 1 deletion cmd/icingadb/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,10 @@ func run() int {
cancelCtx()
}()
s := icingadb.NewSync(db, rc, logs.GetChildLogger("config-sync"))
hs := history.NewSync(db, rc, logs.GetChildLogger("history-sync"))
hs, err := history.NewSync(db, rc, logs.GetChildLogger("history-sync"))
if err != nil {
logger.Fatalf("%+v", errors.Wrap(err, "can't create history sync"))
}
rt := icingadb.NewRuntimeUpdates(db, rc, logs.GetChildLogger("runtime-updates"))
ods := overdue.NewSync(db, rc, logs.GetChildLogger("overdue-sync"))
ret := history.NewRetention(
Expand Down
25 changes: 12 additions & 13 deletions pkg/config/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,19 +128,18 @@ func (d *Database) Open(logger *logging.Logger) (*icingadb.DB, error) {
return nil, unknownDbType(d.Type)
}

db, err := sqlx.Open("icingadb-"+d.Type, dsn)
if err != nil {
return nil, errors.Wrap(err, "can't open database")
}

db.SetMaxIdleConns(d.Options.MaxConnections / 3)
db.SetMaxOpenConns(d.Options.MaxConnections)

db.Mapper = reflectx.NewMapperFunc("db", func(s string) string {
return utils.Key(s, '_')
})

return icingadb.NewDb(db, logger, &d.Options), nil
return icingadb.NewDb(
"icingadb-"+d.Type, dsn,
func(db *sqlx.DB) {
db.SetMaxIdleConns(d.Options.MaxConnections / 3)
db.SetMaxOpenConns(d.Options.MaxConnections)

db.Mapper = reflectx.NewMapperFunc("db", func(s string) string {
return utils.Key(s, '_')
})
},
&d.Options,
logger)
}

// Validate checks constraints in the supplied database configuration and returns an error if they are violated.
Expand Down
37 changes: 33 additions & 4 deletions pkg/icingadb/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ type DB struct {
logger *logging.Logger
tableSemaphores map[string]*semaphore.Weighted
tableSemaphoresMu sync.Mutex

driverName string
dataSourceName string
dbConfFunc func(db *sqlx.DB)
}

// Options define user configurable database options.
Expand Down Expand Up @@ -74,14 +78,39 @@ func (o *Options) Validate() error {
return nil
}

// NewDb returns a new icingadb.DB wrapper for a pre-existing *sqlx.DB.
func NewDb(db *sqlx.DB, logger *logging.Logger, options *Options) *DB {
// NewDb returns a new icingadb.DB wrapper around a *sqlx.DB.
func NewDb(
driverName, dataSourceName string,
dbConfFunc func(db *sqlx.DB),
options *Options,
logger *logging.Logger,
) (*DB, error) {
db, err := sqlx.Open(driverName, dataSourceName)
if err != nil {
return nil, fmt.Errorf("can't open database: %w", err)
}

dbConfFunc(db)

return &DB{
DB: db,
logger: logger,
Options: options,
logger: logger,
tableSemaphores: make(map[string]*semaphore.Weighted),
}
driverName: driverName,
dataSourceName: dataSourceName,
dbConfFunc: dbConfFunc,
}, nil
}

// Copy this icingadb.DB into a new, independent icingadb.DB instance.
func (db *DB) Copy() (*DB, error) {
opts := *db.Options
return NewDb(
db.driverName, db.dataSourceName,
db.dbConfFunc,
&opts,
db.logger)
}

const (
Expand Down
21 changes: 19 additions & 2 deletions pkg/icingadb/history/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package history

import (
"context"
"fmt"
"github.com/go-redis/redis/v8"
"github.com/icinga/icingadb/internal"
"github.com/icinga/icingadb/pkg/com"
Expand Down Expand Up @@ -30,16 +31,32 @@ type Sync struct {
}

// NewSync creates a new Sync.
func NewSync(db *icingadb.DB, redis *icingaredis.Client, logger *logging.Logger) *Sync {
func NewSync(db *icingadb.DB, redis *icingaredis.Client, logger *logging.Logger) (*Sync, error) {
// The only database tables with foreign key constraints are found in the history context. By default - and for good
// reasons - a DB internally consists of a cluster of several database connections. However, if a multi-master
// database server setup is used, it can - and will - happen that this DB contains sessions to different servers and
// related queries will be submitted in the wrong order, at least in the eye of some database servers.
//
// To prevent this problem, an extra DB with only one connection is used for the history sync.
db, err := db.Copy()
if err != nil {
return nil, fmt.Errorf("can't copy DB: %w", err)
}

db.SetMaxIdleConns(1)
db.SetMaxOpenConns(1)

return &Sync{
db: db,
redis: redis,
logger: logger,
}
}, nil
}

// Sync synchronizes Redis history streams from s.redis to s.db and deletes the original data on success.
func (s Sync) Sync(ctx context.Context) error {
defer func() { _ = s.db.Close() }()

g, ctx := errgroup.WithContext(ctx)

for key, pipeline := range syncPipelines {
Expand Down

0 comments on commit 5f70e16

Please sign in to comment.