From 2aa17783a60021edbff27b2ba6543c40b4906529 Mon Sep 17 00:00:00 2001 From: "Alexander A. Klimov" Date: Fri, 8 Mar 2024 16:37:49 +0100 Subject: [PATCH 1/2] MySQL driver: on connect try setting wsrep_sync_wait=7, swallow error 1193 In Galera clusters wsrep_sync_wait=7 lets statements catch up all pending sync between nodes first. This way new child rows await fresh parent ones from other nodes not to run into foreign key errors. MySQL single nodes will reject this with error 1193 "Unknown system variable" which is OK. --- pkg/driver/driver.go | 2 +- pkg/driver/mysql.go | 100 ++++++++++++++++++++++ pkg/driver/mysql_test.go | 178 +++++++++++++++++++++++++++++++++++++++ 3 files changed, 279 insertions(+), 1 deletion(-) create mode 100644 pkg/driver/mysql.go create mode 100644 pkg/driver/mysql_test.go diff --git a/pkg/driver/driver.go b/pkg/driver/driver.go index f529db445..4c63e0893 100644 --- a/pkg/driver/driver.go +++ b/pkg/driver/driver.go @@ -85,7 +85,7 @@ func (d Driver) OpenConnector(name string) (driver.Connector, error) { // Register makes our database Driver available under the name "icingadb-*sql". func Register(logger *logging.Logger) { - sql.Register(MySQL, &Driver{ctxDriver: &mysql.MySQLDriver{}, Logger: logger}) + sql.Register(MySQL, &Driver{ctxDriver: MySQLDriver{}, Logger: logger}) sql.Register(PostgreSQL, &Driver{ctxDriver: PgSQLDriver{}, Logger: logger}) _ = mysql.SetLogger(mysqlLogger(func(v ...interface{}) { logger.Debug(v...) })) sqlx.BindDriver(PostgreSQL, sqlx.DOLLAR) diff --git a/pkg/driver/mysql.go b/pkg/driver/mysql.go new file mode 100644 index 000000000..728ca2bd9 --- /dev/null +++ b/pkg/driver/mysql.go @@ -0,0 +1,100 @@ +package driver + +import ( + "context" + "database/sql/driver" + "github.com/go-sql-driver/mysql" + "github.com/pkg/errors" +) + +// MySQLDriver extends mysql.MySQLDriver with auto-SETting Galera cluster options. +type MySQLDriver struct { + mysql.MySQLDriver +} + +// Open implements the driver.Driver interface. +func (md MySQLDriver) Open(name string) (driver.Conn, error) { + connector, err := md.OpenConnector(name) + if err != nil { + return nil, err + } + + return connector.Connect(context.Background()) +} + +// OpenConnector implements the driver.DriverContext interface. +func (md MySQLDriver) OpenConnector(name string) (driver.Connector, error) { + connector, err := md.MySQLDriver.OpenConnector(name) + if err != nil { + return nil, err + } + + return &galeraAwareConnector{connector, md}, nil +} + +// galeraAwareConnector extends mysql.connector with auto-SETting Galera cluster options. +type galeraAwareConnector struct { + driver.Connector + + driver driver.Driver +} + +// Connect implements the driver.Connector interface. +func (gac *galeraAwareConnector) Connect(ctx context.Context) (driver.Conn, error) { + conn, err := gac.Connector.Connect(ctx) + if err != nil { + return nil, err + } + + if err := setGaleraOpts(ctx, conn); err != nil { + _ = conn.Close() + return nil, err + } + + return conn, nil +} + +// Driver implements the driver.Connector interface. +func (gac *galeraAwareConnector) Driver() driver.Driver { + return gac.driver +} + +var errUnknownSysVar = &mysql.MySQLError{Number: 1193} + +// setGaleraOpts tries SET SESSION wsrep_sync_wait=7. +// +// This ensures causality checks will take place before executing anything, +// ensuring that every statement is executed on a fully synced node. +// https://mariadb.com/kb/en/galera-cluster-system-variables/#wsrep_sync_wait +// +// It prevents running into foreign key errors while inserting into linked tables on different MySQL nodes. +// Error 1193 "Unknown system variable" is ignored to support MySQL single nodes. +func setGaleraOpts(ctx context.Context, conn driver.Conn) error { + const galeraOpts = "SET SESSION wsrep_sync_wait=7" + + stmt, err := conn.(driver.ConnPrepareContext).PrepareContext(ctx, galeraOpts) + if err != nil { + err = errors.Wrap(err, "can't prepare "+galeraOpts) + } else if _, err = stmt.(driver.StmtExecContext).ExecContext(ctx, nil); err != nil { + err = errors.Wrap(err, "can't execute "+galeraOpts) + } + + if err != nil && errors.Is(err, errUnknownSysVar) { + err = nil + } + + if stmt != nil { + if errClose := stmt.Close(); errClose != nil && err == nil { + err = errors.Wrap(errClose, "can't close statement "+galeraOpts) + } + } + + return err +} + +// Assert interface compliance. +var ( + _ driver.Driver = MySQLDriver{} + _ driver.DriverContext = MySQLDriver{} + _ driver.Connector = (*galeraAwareConnector)(nil) +) diff --git a/pkg/driver/mysql_test.go b/pkg/driver/mysql_test.go new file mode 100644 index 000000000..38afc531a --- /dev/null +++ b/pkg/driver/mysql_test.go @@ -0,0 +1,178 @@ +package driver + +import ( + "context" + "database/sql/driver" + "github.com/go-sql-driver/mysql" + "github.com/stretchr/testify/assert" + "io" + "os" + "testing" +) + +func TestSetGaleraOpts(t *testing.T) { + tolerated := &mysql.MySQLError{ + Number: errUnknownSysVar.Number, + SQLState: [5]byte{255, 0, 42, 23, 7}, // Shall not confuse error comparison + Message: "This unusual text shall not confuse error comparison.", + } + + almostTolerated := &mysql.MySQLError{} + *almostTolerated = *tolerated + almostTolerated.Number-- + + notTolerated := io.EOF + ignoredCodeLocation := os.ErrPermission + + subtests := []struct { + name string + input testConn + output error + }{{ + name: "Conn PrepareContext returns error", + input: testConn{prepareError: notTolerated}, + output: notTolerated, + }, { + name: "Conn PrepareContext returns MySQLError", + input: testConn{prepareError: almostTolerated}, + output: almostTolerated, + }, { + name: "Conn PrepareContext returns MySQLError 1193", + input: testConn{prepareError: tolerated}, + output: nil, + }, { + name: "Stmt ExecContext returns error", + input: testConn{preparedStmt: &testStmt{ + execError: notTolerated, + }}, + output: notTolerated, + }, { + name: "Stmt ExecContext and Stmt Close return error", + input: testConn{preparedStmt: &testStmt{ + execError: notTolerated, + closeError: ignoredCodeLocation, + }}, + output: notTolerated, + }, { + name: "Stmt ExecContext returns MySQLError", + input: testConn{preparedStmt: &testStmt{ + execError: almostTolerated, + }}, + output: almostTolerated, + }, { + name: "Stmt ExecContext returns MySQLError and Stmt Close returns error", + input: testConn{preparedStmt: &testStmt{ + execError: almostTolerated, + closeError: ignoredCodeLocation, + }}, + output: almostTolerated, + }, { + name: "Stmt ExecContext returns MySQLError 1193", + input: testConn{preparedStmt: &testStmt{ + execError: tolerated, + }}, + output: nil, + }, { + name: "Stmt ExecContext and Stmt Close return MySQLError 1193", + input: testConn{preparedStmt: &testStmt{ + execError: tolerated, + closeError: tolerated, + }}, + output: tolerated, + }, { + name: "Stmt Close returns MySQLError 1193", + input: testConn{preparedStmt: &testStmt{ + execResult: driver.ResultNoRows, + closeError: tolerated, + }}, + output: tolerated, + }, { + name: "no errors", + input: testConn{preparedStmt: &testStmt{ + execResult: driver.ResultNoRows, + }}, + output: nil, + }} + + for _, st := range subtests { + t.Run(st.name, func(t *testing.T) { + assert.ErrorIs(t, setGaleraOpts(context.Background(), &st.input), st.output) + assert.GreaterOrEqual(t, st.input.prepareCalls, uint8(1)) + + if ts, ok := st.input.preparedStmt.(*testStmt); ok { + assert.GreaterOrEqual(t, ts.execCalls, st.input.prepareCalls) + assert.GreaterOrEqual(t, ts.closeCalls, st.input.prepareCalls) + } + }) + } +} + +type testStmt struct { + execResult driver.Result + execError error + execCalls uint8 + closeError error + closeCalls uint8 +} + +// Close implements the driver.Stmt interface. +func (ts *testStmt) Close() error { + ts.closeCalls++ + return ts.closeError +} + +// NumInput implements the driver.Stmt interface. +func (*testStmt) NumInput() int { + panic("don't call me") +} + +// Exec implements the driver.Stmt interface. +func (*testStmt) Exec([]driver.Value) (driver.Result, error) { + panic("don't call me") +} + +// Query implements the driver.Stmt interface. +func (*testStmt) Query([]driver.Value) (driver.Rows, error) { + panic("don't call me") +} + +// ExecContext implements the driver.StmtExecContext interface. +func (ts *testStmt) ExecContext(context.Context, []driver.NamedValue) (driver.Result, error) { + ts.execCalls++ + return ts.execResult, ts.execError +} + +type testConn struct { + preparedStmt driver.Stmt + prepareError error + prepareCalls uint8 +} + +// Prepare implements the driver.Conn interface. +func (*testConn) Prepare(string) (driver.Stmt, error) { + panic("don't call me") +} + +// Close implements the driver.Conn interface. +func (*testConn) Close() error { + panic("don't call me") +} + +// Begin implements the driver.Conn interface. +func (*testConn) Begin() (driver.Tx, error) { + panic("don't call me") +} + +// PrepareContext implements the driver.ConnPrepareContext interface. +func (tc *testConn) PrepareContext(context.Context, string) (driver.Stmt, error) { + tc.prepareCalls++ + return tc.preparedStmt, tc.prepareError +} + +// Assert interface compliance. +var ( + _ driver.Conn = (*testConn)(nil) + _ driver.ConnPrepareContext = (*testConn)(nil) + _ driver.Stmt = (*testStmt)(nil) + _ driver.StmtExecContext = (*testStmt)(nil) +) From 2f4a5425f16b740a64b72d5babe1e23aebaac725 Mon Sep 17 00:00:00 2001 From: "Alexander A. Klimov" Date: Fri, 8 Mar 2024 17:38:27 +0100 Subject: [PATCH 2/2] Make value for SET SESSION wsrep_sync_wait= configurable --- pkg/config/database.go | 6 +++++- pkg/driver/mysql.go | 37 ++++++++++++++++++++++++++++--------- pkg/driver/mysql_test.go | 2 +- pkg/icingadb/db.go | 4 ++++ 4 files changed, 38 insertions(+), 11 deletions(-) diff --git a/pkg/config/database.go b/pkg/config/database.go index b42ff8e1c..72f59ca78 100644 --- a/pkg/config/database.go +++ b/pkg/config/database.go @@ -61,7 +61,11 @@ func (d *Database) Open(logger *logging.Logger) (*icingadb.DB, error) { config.DBName = d.Database config.Timeout = time.Minute - config.Params = map[string]string{"sql_mode": "ANSI_QUOTES"} + + config.Params = map[string]string{ + "sql_mode": "ANSI_QUOTES", + "wsrep_sync_wait": strconv.FormatInt(int64(d.Options.WsrepSyncWait), 10), + } tlsConfig, err := d.TlsOptions.MakeConfig(d.Host) if err != nil { diff --git a/pkg/driver/mysql.go b/pkg/driver/mysql.go index 728ca2bd9..c59870139 100644 --- a/pkg/driver/mysql.go +++ b/pkg/driver/mysql.go @@ -5,6 +5,7 @@ import ( "database/sql/driver" "github.com/go-sql-driver/mysql" "github.com/pkg/errors" + "strconv" ) // MySQLDriver extends mysql.MySQLDriver with auto-SETting Galera cluster options. @@ -24,19 +25,34 @@ func (md MySQLDriver) Open(name string) (driver.Conn, error) { // OpenConnector implements the driver.DriverContext interface. func (md MySQLDriver) OpenConnector(name string) (driver.Connector, error) { + var wsrepSyncWait int64 + + if config, err := mysql.ParseDSN(name); err == nil { + if s, ok := config.Params["wsrep_sync_wait"]; ok { + if i, err := strconv.ParseInt(s, 10, 64); err == nil { + // MySQL single nodes don't know wsrep_sync_wait and fail with error 1193 "Unknown system variable". + // We have to SET it manually later and swallow error 1193 not to fail our connections. + wsrepSyncWait = i + delete(config.Params, "wsrep_sync_wait") + name = config.FormatDSN() + } + } + } + connector, err := md.MySQLDriver.OpenConnector(name) if err != nil { return nil, err } - return &galeraAwareConnector{connector, md}, nil + return &galeraAwareConnector{connector, wsrepSyncWait, md}, nil } // galeraAwareConnector extends mysql.connector with auto-SETting Galera cluster options. type galeraAwareConnector struct { driver.Connector - driver driver.Driver + wsrepSyncWait int64 + driver driver.Driver } // Connect implements the driver.Connector interface. @@ -46,7 +62,7 @@ func (gac *galeraAwareConnector) Connect(ctx context.Context) (driver.Conn, erro return nil, err } - if err := setGaleraOpts(ctx, conn); err != nil { + if err := setGaleraOpts(ctx, conn, gac.wsrepSyncWait); err != nil { _ = conn.Close() return nil, err } @@ -61,22 +77,25 @@ func (gac *galeraAwareConnector) Driver() driver.Driver { var errUnknownSysVar = &mysql.MySQLError{Number: 1193} -// setGaleraOpts tries SET SESSION wsrep_sync_wait=7. +// setGaleraOpts tries SET SESSION wsrep_sync_wait. // -// This ensures causality checks will take place before executing anything, +// This ensures causality checks will take place before execution, // ensuring that every statement is executed on a fully synced node. // https://mariadb.com/kb/en/galera-cluster-system-variables/#wsrep_sync_wait // // It prevents running into foreign key errors while inserting into linked tables on different MySQL nodes. // Error 1193 "Unknown system variable" is ignored to support MySQL single nodes. -func setGaleraOpts(ctx context.Context, conn driver.Conn) error { - const galeraOpts = "SET SESSION wsrep_sync_wait=7" +func setGaleraOpts(ctx context.Context, conn driver.Conn, wsrepSyncWait int64) error { + const galeraOpts = "SET SESSION wsrep_sync_wait=?" stmt, err := conn.(driver.ConnPrepareContext).PrepareContext(ctx, galeraOpts) if err != nil { err = errors.Wrap(err, "can't prepare "+galeraOpts) - } else if _, err = stmt.(driver.StmtExecContext).ExecContext(ctx, nil); err != nil { - err = errors.Wrap(err, "can't execute "+galeraOpts) + } else { + _, err = stmt.(driver.StmtExecContext).ExecContext(ctx, []driver.NamedValue{{Value: wsrepSyncWait}}) + if err != nil { + err = errors.Wrap(err, "can't execute "+galeraOpts) + } } if err != nil && errors.Is(err, errUnknownSysVar) { diff --git a/pkg/driver/mysql_test.go b/pkg/driver/mysql_test.go index 38afc531a..16587674a 100644 --- a/pkg/driver/mysql_test.go +++ b/pkg/driver/mysql_test.go @@ -96,7 +96,7 @@ func TestSetGaleraOpts(t *testing.T) { for _, st := range subtests { t.Run(st.name, func(t *testing.T) { - assert.ErrorIs(t, setGaleraOpts(context.Background(), &st.input), st.output) + assert.ErrorIs(t, setGaleraOpts(context.Background(), &st.input, 7), st.output) assert.GreaterOrEqual(t, st.input.prepareCalls, uint8(1)) if ts, ok := st.input.preparedStmt.(*testStmt); ok { diff --git a/pkg/icingadb/db.go b/pkg/icingadb/db.go index 4ff3e0dde..91901608d 100644 --- a/pkg/icingadb/db.go +++ b/pkg/icingadb/db.go @@ -54,6 +54,10 @@ type Options struct { // MaxRowsPerTransaction defines the maximum number of rows per transaction. // The default is 2^13, which in our tests showed the best performance in terms of execution time and parallelism. MaxRowsPerTransaction int `yaml:"max_rows_per_transaction" default:"8192"` + + // WsrepSyncWait defines which kinds of SQL statements catch up all pending sync between nodes first, see: + // https://mariadb.com/kb/en/galera-cluster-system-variables/#wsrep_sync_wait + WsrepSyncWait int `yaml:"wsrep_sync_wait" default:"7"` } // Validate checks constraints in the supplied database options and returns an error if they are violated.