Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

History Sync: Use single, distinct DB connection #681

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion cmd/icingadb/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,10 @@ func run() int {
cancelCtx()
}()
s := icingadb.NewSync(db, rc, logs.GetChildLogger("config-sync"))
hs := history.NewSync(db, rc, logs.GetChildLogger("history-sync"))
hs, err := history.NewSync(db, rc, logs.GetChildLogger("history-sync"))
if err != nil {
logger.Fatalf("%+v", errors.Wrap(err, "can't create history sync"))
}
rt := icingadb.NewRuntimeUpdates(db, rc, logs.GetChildLogger("runtime-updates"))
ods := overdue.NewSync(db, rc, logs.GetChildLogger("overdue-sync"))
ret := history.NewRetention(
Expand Down
25 changes: 12 additions & 13 deletions pkg/config/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,19 +128,18 @@ func (d *Database) Open(logger *logging.Logger) (*icingadb.DB, error) {
return nil, unknownDbType(d.Type)
}

db, err := sqlx.Open("icingadb-"+d.Type, dsn)
if err != nil {
return nil, errors.Wrap(err, "can't open database")
}

db.SetMaxIdleConns(d.Options.MaxConnections / 3)
db.SetMaxOpenConns(d.Options.MaxConnections)

db.Mapper = reflectx.NewMapperFunc("db", func(s string) string {
return utils.Key(s, '_')
})

return icingadb.NewDb(db, logger, &d.Options), nil
return icingadb.NewDb(
"icingadb-"+d.Type, dsn,
func(db *sqlx.DB) {
db.SetMaxIdleConns(d.Options.MaxConnections / 3)
db.SetMaxOpenConns(d.Options.MaxConnections)

db.Mapper = reflectx.NewMapperFunc("db", func(s string) string {
return utils.Key(s, '_')
})
},
&d.Options,
logger)
}

// Validate checks constraints in the supplied database configuration and returns an error if they are violated.
Expand Down
37 changes: 33 additions & 4 deletions pkg/icingadb/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ type DB struct {
logger *logging.Logger
tableSemaphores map[string]*semaphore.Weighted
tableSemaphoresMu sync.Mutex

driverName string
dataSourceName string
dbConfFunc func(db *sqlx.DB)
}

// Options define user configurable database options.
Expand Down Expand Up @@ -74,14 +78,39 @@ func (o *Options) Validate() error {
return nil
}

// NewDb returns a new icingadb.DB wrapper for a pre-existing *sqlx.DB.
func NewDb(db *sqlx.DB, logger *logging.Logger, options *Options) *DB {
// NewDb returns a new icingadb.DB wrapper around a *sqlx.DB.
func NewDb(
driverName, dataSourceName string,
dbConfFunc func(db *sqlx.DB),
options *Options,
logger *logging.Logger,
) (*DB, error) {
db, err := sqlx.Open(driverName, dataSourceName)
if err != nil {
return nil, fmt.Errorf("can't open database: %w", err)
}

dbConfFunc(db)

return &DB{
DB: db,
logger: logger,
Options: options,
logger: logger,
tableSemaphores: make(map[string]*semaphore.Weighted),
}
driverName: driverName,
dataSourceName: dataSourceName,
dbConfFunc: dbConfFunc,
}, nil
}

// Copy this icingadb.DB into a new, independent icingadb.DB instance.
func (db *DB) Copy() (*DB, error) {
opts := *db.Options
return NewDb(
db.driverName, db.dataSourceName,
db.dbConfFunc,
&opts,
db.logger)
}

const (
Expand Down
21 changes: 19 additions & 2 deletions pkg/icingadb/history/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package history

import (
"context"
"fmt"
"github.com/go-redis/redis/v8"
"github.com/icinga/icingadb/internal"
"github.com/icinga/icingadb/pkg/com"
Expand Down Expand Up @@ -30,16 +31,32 @@ type Sync struct {
}

// NewSync creates a new Sync.
func NewSync(db *icingadb.DB, redis *icingaredis.Client, logger *logging.Logger) *Sync {
func NewSync(db *icingadb.DB, redis *icingaredis.Client, logger *logging.Logger) (*Sync, error) {
// The only database tables with foreign key constraints are found in the history context. By default - and for good
// reasons - a DB internally consists of a cluster of several database connections. However, if a multi-master
// database server setup is used, it can - and will - happen that this DB contains sessions to different servers and
// related queries will be submitted in the wrong order, at least in the eye of some database servers.
//
// To prevent this problem, an extra DB with only one connection is used for the history sync.
db, err := db.Copy()
if err != nil {
return nil, fmt.Errorf("can't copy DB: %w", err)
}

db.SetMaxIdleConns(1)
db.SetMaxOpenConns(1)

return &Sync{
db: db,
redis: redis,
logger: logger,
}
}, nil
}

// Sync synchronizes Redis history streams from s.redis to s.db and deletes the original data on success.
func (s Sync) Sync(ctx context.Context) error {
defer func() { _ = s.db.Close() }()

g, ctx := errgroup.WithContext(ctx)

for key, pipeline := range syncPipelines {
Expand Down
Loading