diff --git a/pkg/config/database.go b/pkg/config/database.go index b42ff8e1c..72f59ca78 100644 --- a/pkg/config/database.go +++ b/pkg/config/database.go @@ -61,7 +61,11 @@ func (d *Database) Open(logger *logging.Logger) (*icingadb.DB, error) { config.DBName = d.Database config.Timeout = time.Minute - config.Params = map[string]string{"sql_mode": "ANSI_QUOTES"} + + config.Params = map[string]string{ + "sql_mode": "ANSI_QUOTES", + "wsrep_sync_wait": strconv.FormatInt(int64(d.Options.WsrepSyncWait), 10), + } tlsConfig, err := d.TlsOptions.MakeConfig(d.Host) if err != nil { diff --git a/pkg/driver/driver.go b/pkg/driver/driver.go index f529db445..4c63e0893 100644 --- a/pkg/driver/driver.go +++ b/pkg/driver/driver.go @@ -85,7 +85,7 @@ func (d Driver) OpenConnector(name string) (driver.Connector, error) { // Register makes our database Driver available under the name "icingadb-*sql". func Register(logger *logging.Logger) { - sql.Register(MySQL, &Driver{ctxDriver: &mysql.MySQLDriver{}, Logger: logger}) + sql.Register(MySQL, &Driver{ctxDriver: MySQLDriver{}, Logger: logger}) sql.Register(PostgreSQL, &Driver{ctxDriver: PgSQLDriver{}, Logger: logger}) _ = mysql.SetLogger(mysqlLogger(func(v ...interface{}) { logger.Debug(v...) })) sqlx.BindDriver(PostgreSQL, sqlx.DOLLAR) diff --git a/pkg/driver/mysql.go b/pkg/driver/mysql.go new file mode 100644 index 000000000..c59870139 --- /dev/null +++ b/pkg/driver/mysql.go @@ -0,0 +1,119 @@ +package driver + +import ( + "context" + "database/sql/driver" + "github.com/go-sql-driver/mysql" + "github.com/pkg/errors" + "strconv" +) + +// MySQLDriver extends mysql.MySQLDriver with auto-SETting Galera cluster options. +type MySQLDriver struct { + mysql.MySQLDriver +} + +// Open implements the driver.Driver interface. +func (md MySQLDriver) Open(name string) (driver.Conn, error) { + connector, err := md.OpenConnector(name) + if err != nil { + return nil, err + } + + return connector.Connect(context.Background()) +} + +// OpenConnector implements the driver.DriverContext interface. +func (md MySQLDriver) OpenConnector(name string) (driver.Connector, error) { + var wsrepSyncWait int64 + + if config, err := mysql.ParseDSN(name); err == nil { + if s, ok := config.Params["wsrep_sync_wait"]; ok { + if i, err := strconv.ParseInt(s, 10, 64); err == nil { + // MySQL single nodes don't know wsrep_sync_wait and fail with error 1193 "Unknown system variable". + // We have to SET it manually later and swallow error 1193 not to fail our connections. + wsrepSyncWait = i + delete(config.Params, "wsrep_sync_wait") + name = config.FormatDSN() + } + } + } + + connector, err := md.MySQLDriver.OpenConnector(name) + if err != nil { + return nil, err + } + + return &galeraAwareConnector{connector, wsrepSyncWait, md}, nil +} + +// galeraAwareConnector extends mysql.connector with auto-SETting Galera cluster options. +type galeraAwareConnector struct { + driver.Connector + + wsrepSyncWait int64 + driver driver.Driver +} + +// Connect implements the driver.Connector interface. +func (gac *galeraAwareConnector) Connect(ctx context.Context) (driver.Conn, error) { + conn, err := gac.Connector.Connect(ctx) + if err != nil { + return nil, err + } + + if err := setGaleraOpts(ctx, conn, gac.wsrepSyncWait); err != nil { + _ = conn.Close() + return nil, err + } + + return conn, nil +} + +// Driver implements the driver.Connector interface. +func (gac *galeraAwareConnector) Driver() driver.Driver { + return gac.driver +} + +var errUnknownSysVar = &mysql.MySQLError{Number: 1193} + +// setGaleraOpts tries SET SESSION wsrep_sync_wait. +// +// This ensures causality checks will take place before execution, +// ensuring that every statement is executed on a fully synced node. +// https://mariadb.com/kb/en/galera-cluster-system-variables/#wsrep_sync_wait +// +// It prevents running into foreign key errors while inserting into linked tables on different MySQL nodes. +// Error 1193 "Unknown system variable" is ignored to support MySQL single nodes. +func setGaleraOpts(ctx context.Context, conn driver.Conn, wsrepSyncWait int64) error { + const galeraOpts = "SET SESSION wsrep_sync_wait=?" + + stmt, err := conn.(driver.ConnPrepareContext).PrepareContext(ctx, galeraOpts) + if err != nil { + err = errors.Wrap(err, "can't prepare "+galeraOpts) + } else { + _, err = stmt.(driver.StmtExecContext).ExecContext(ctx, []driver.NamedValue{{Value: wsrepSyncWait}}) + if err != nil { + err = errors.Wrap(err, "can't execute "+galeraOpts) + } + } + + if err != nil && errors.Is(err, errUnknownSysVar) { + err = nil + } + + if stmt != nil { + if errClose := stmt.Close(); errClose != nil && err == nil { + err = errors.Wrap(errClose, "can't close statement "+galeraOpts) + } + } + + return err +} + +// Assert interface compliance. +var ( + _ driver.Driver = MySQLDriver{} + _ driver.DriverContext = MySQLDriver{} + _ driver.Connector = (*galeraAwareConnector)(nil) +) diff --git a/pkg/driver/mysql_test.go b/pkg/driver/mysql_test.go new file mode 100644 index 000000000..16587674a --- /dev/null +++ b/pkg/driver/mysql_test.go @@ -0,0 +1,178 @@ +package driver + +import ( + "context" + "database/sql/driver" + "github.com/go-sql-driver/mysql" + "github.com/stretchr/testify/assert" + "io" + "os" + "testing" +) + +func TestSetGaleraOpts(t *testing.T) { + tolerated := &mysql.MySQLError{ + Number: errUnknownSysVar.Number, + SQLState: [5]byte{255, 0, 42, 23, 7}, // Shall not confuse error comparison + Message: "This unusual text shall not confuse error comparison.", + } + + almostTolerated := &mysql.MySQLError{} + *almostTolerated = *tolerated + almostTolerated.Number-- + + notTolerated := io.EOF + ignoredCodeLocation := os.ErrPermission + + subtests := []struct { + name string + input testConn + output error + }{{ + name: "Conn PrepareContext returns error", + input: testConn{prepareError: notTolerated}, + output: notTolerated, + }, { + name: "Conn PrepareContext returns MySQLError", + input: testConn{prepareError: almostTolerated}, + output: almostTolerated, + }, { + name: "Conn PrepareContext returns MySQLError 1193", + input: testConn{prepareError: tolerated}, + output: nil, + }, { + name: "Stmt ExecContext returns error", + input: testConn{preparedStmt: &testStmt{ + execError: notTolerated, + }}, + output: notTolerated, + }, { + name: "Stmt ExecContext and Stmt Close return error", + input: testConn{preparedStmt: &testStmt{ + execError: notTolerated, + closeError: ignoredCodeLocation, + }}, + output: notTolerated, + }, { + name: "Stmt ExecContext returns MySQLError", + input: testConn{preparedStmt: &testStmt{ + execError: almostTolerated, + }}, + output: almostTolerated, + }, { + name: "Stmt ExecContext returns MySQLError and Stmt Close returns error", + input: testConn{preparedStmt: &testStmt{ + execError: almostTolerated, + closeError: ignoredCodeLocation, + }}, + output: almostTolerated, + }, { + name: "Stmt ExecContext returns MySQLError 1193", + input: testConn{preparedStmt: &testStmt{ + execError: tolerated, + }}, + output: nil, + }, { + name: "Stmt ExecContext and Stmt Close return MySQLError 1193", + input: testConn{preparedStmt: &testStmt{ + execError: tolerated, + closeError: tolerated, + }}, + output: tolerated, + }, { + name: "Stmt Close returns MySQLError 1193", + input: testConn{preparedStmt: &testStmt{ + execResult: driver.ResultNoRows, + closeError: tolerated, + }}, + output: tolerated, + }, { + name: "no errors", + input: testConn{preparedStmt: &testStmt{ + execResult: driver.ResultNoRows, + }}, + output: nil, + }} + + for _, st := range subtests { + t.Run(st.name, func(t *testing.T) { + assert.ErrorIs(t, setGaleraOpts(context.Background(), &st.input, 7), st.output) + assert.GreaterOrEqual(t, st.input.prepareCalls, uint8(1)) + + if ts, ok := st.input.preparedStmt.(*testStmt); ok { + assert.GreaterOrEqual(t, ts.execCalls, st.input.prepareCalls) + assert.GreaterOrEqual(t, ts.closeCalls, st.input.prepareCalls) + } + }) + } +} + +type testStmt struct { + execResult driver.Result + execError error + execCalls uint8 + closeError error + closeCalls uint8 +} + +// Close implements the driver.Stmt interface. +func (ts *testStmt) Close() error { + ts.closeCalls++ + return ts.closeError +} + +// NumInput implements the driver.Stmt interface. +func (*testStmt) NumInput() int { + panic("don't call me") +} + +// Exec implements the driver.Stmt interface. +func (*testStmt) Exec([]driver.Value) (driver.Result, error) { + panic("don't call me") +} + +// Query implements the driver.Stmt interface. +func (*testStmt) Query([]driver.Value) (driver.Rows, error) { + panic("don't call me") +} + +// ExecContext implements the driver.StmtExecContext interface. +func (ts *testStmt) ExecContext(context.Context, []driver.NamedValue) (driver.Result, error) { + ts.execCalls++ + return ts.execResult, ts.execError +} + +type testConn struct { + preparedStmt driver.Stmt + prepareError error + prepareCalls uint8 +} + +// Prepare implements the driver.Conn interface. +func (*testConn) Prepare(string) (driver.Stmt, error) { + panic("don't call me") +} + +// Close implements the driver.Conn interface. +func (*testConn) Close() error { + panic("don't call me") +} + +// Begin implements the driver.Conn interface. +func (*testConn) Begin() (driver.Tx, error) { + panic("don't call me") +} + +// PrepareContext implements the driver.ConnPrepareContext interface. +func (tc *testConn) PrepareContext(context.Context, string) (driver.Stmt, error) { + tc.prepareCalls++ + return tc.preparedStmt, tc.prepareError +} + +// Assert interface compliance. +var ( + _ driver.Conn = (*testConn)(nil) + _ driver.ConnPrepareContext = (*testConn)(nil) + _ driver.Stmt = (*testStmt)(nil) + _ driver.StmtExecContext = (*testStmt)(nil) +) diff --git a/pkg/icingadb/db.go b/pkg/icingadb/db.go index 4ff3e0dde..91901608d 100644 --- a/pkg/icingadb/db.go +++ b/pkg/icingadb/db.go @@ -54,6 +54,10 @@ type Options struct { // MaxRowsPerTransaction defines the maximum number of rows per transaction. // The default is 2^13, which in our tests showed the best performance in terms of execution time and parallelism. MaxRowsPerTransaction int `yaml:"max_rows_per_transaction" default:"8192"` + + // WsrepSyncWait defines which kinds of SQL statements catch up all pending sync between nodes first, see: + // https://mariadb.com/kb/en/galera-cluster-system-variables/#wsrep_sync_wait + WsrepSyncWait int `yaml:"wsrep_sync_wait" default:"7"` } // Validate checks constraints in the supplied database options and returns an error if they are violated.