Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MySQL driver: on connect try setting wsrep_sync_wait=7, swallow error 1193 #665

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion pkg/config/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,11 @@ func (d *Database) Open(logger *logging.Logger) (*icingadb.DB, error) {

config.DBName = d.Database
config.Timeout = time.Minute
config.Params = map[string]string{"sql_mode": "ANSI_QUOTES"}

config.Params = map[string]string{
"sql_mode": "ANSI_QUOTES",
"wsrep_sync_wait": strconv.FormatInt(int64(d.Options.WsrepSyncWait), 10),
}

tlsConfig, err := d.TlsOptions.MakeConfig(d.Host)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func (d Driver) OpenConnector(name string) (driver.Connector, error) {

// Register makes our database Driver available under the name "icingadb-*sql".
func Register(logger *logging.Logger) {
sql.Register(MySQL, &Driver{ctxDriver: &mysql.MySQLDriver{}, Logger: logger})
sql.Register(MySQL, &Driver{ctxDriver: MySQLDriver{}, Logger: logger})
sql.Register(PostgreSQL, &Driver{ctxDriver: PgSQLDriver{}, Logger: logger})
_ = mysql.SetLogger(mysqlLogger(func(v ...interface{}) { logger.Debug(v...) }))
sqlx.BindDriver(PostgreSQL, sqlx.DOLLAR)
Expand Down
119 changes: 119 additions & 0 deletions pkg/driver/mysql.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
package driver

import (
"context"
"database/sql/driver"
"github.com/go-sql-driver/mysql"
"github.com/pkg/errors"
oxzi marked this conversation as resolved.
Show resolved Hide resolved
"strconv"
)

// MySQLDriver extends mysql.MySQLDriver with auto-SETting Galera cluster options.
type MySQLDriver struct {
mysql.MySQLDriver
}

// Open implements the driver.Driver interface.
func (md MySQLDriver) Open(name string) (driver.Conn, error) {
connector, err := md.OpenConnector(name)
if err != nil {
return nil, err
}

return connector.Connect(context.Background())
}

// OpenConnector implements the driver.DriverContext interface.
func (md MySQLDriver) OpenConnector(name string) (driver.Connector, error) {
var wsrepSyncWait int64

if config, err := mysql.ParseDSN(name); err == nil {
if s, ok := config.Params["wsrep_sync_wait"]; ok {
if i, err := strconv.ParseInt(s, 10, 64); err == nil {
// MySQL single nodes don't know wsrep_sync_wait and fail with error 1193 "Unknown system variable".
// We have to SET it manually later and swallow error 1193 not to fail our connections.
wsrepSyncWait = i
delete(config.Params, "wsrep_sync_wait")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Storing wsrep_sync_wait inside the config.Params feels a bit like a cheat, especially as it will be deleted at this point. Maybe add at least a short comment explaining this?

name = config.FormatDSN()
}
}
yhabteab marked this conversation as resolved.
Show resolved Hide resolved
Comment on lines +30 to +39
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should actually be possible to completely avoid the DSN for connection to a database by chaining these functions:

This should then allow simply passing the config option as a struct member instead.

(And maybe this could also allow us to avoid having to register the custom "icingadb-mysql" driver at all.)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why just MySQL? Our Pg driver has a NewConnector() as well.

And what's the problem in general? Describing a connection to a database is the DSN's purpose.

}

connector, err := md.MySQLDriver.OpenConnector(name)
if err != nil {
return nil, err
}

return &galeraAwareConnector{connector, wsrepSyncWait, md}, nil
}

// galeraAwareConnector extends mysql.connector with auto-SETting Galera cluster options.
type galeraAwareConnector struct {
driver.Connector

wsrepSyncWait int64
driver driver.Driver
}

// Connect implements the driver.Connector interface.
func (gac *galeraAwareConnector) Connect(ctx context.Context) (driver.Conn, error) {
conn, err := gac.Connector.Connect(ctx)
if err != nil {
return nil, err
}

if err := setGaleraOpts(ctx, conn, gac.wsrepSyncWait); err != nil {
_ = conn.Close()
return nil, err
}

return conn, nil
}

// Driver implements the driver.Connector interface.
func (gac *galeraAwareConnector) Driver() driver.Driver {
return gac.driver
}

var errUnknownSysVar = &mysql.MySQLError{Number: 1193}

// setGaleraOpts tries SET SESSION wsrep_sync_wait.
//
// This ensures causality checks will take place before execution,
// ensuring that every statement is executed on a fully synced node.
// https://mariadb.com/kb/en/galera-cluster-system-variables/#wsrep_sync_wait
//
// It prevents running into foreign key errors while inserting into linked tables on different MySQL nodes.
// Error 1193 "Unknown system variable" is ignored to support MySQL single nodes.
func setGaleraOpts(ctx context.Context, conn driver.Conn, wsrepSyncWait int64) error {
const galeraOpts = "SET SESSION wsrep_sync_wait=?"

stmt, err := conn.(driver.ConnPrepareContext).PrepareContext(ctx, galeraOpts)
if err != nil {
err = errors.Wrap(err, "can't prepare "+galeraOpts)
} else {
_, err = stmt.(driver.StmtExecContext).ExecContext(ctx, []driver.NamedValue{{Value: wsrepSyncWait}})
if err != nil {
err = errors.Wrap(err, "can't execute "+galeraOpts)
}
}

if err != nil && errors.Is(err, errUnknownSysVar) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if err != nil && errors.Is(err, errUnknownSysVar) {
if errors.Is(err, errUnknownSysVar) {

errors.Is performs the nil check for you.

err = nil
}

if stmt != nil {
if errClose := stmt.Close(); errClose != nil && err == nil {
err = errors.Wrap(errClose, "can't close statement "+galeraOpts)
}
}

return err
}

// Assert interface compliance.
var (
_ driver.Driver = MySQLDriver{}
_ driver.DriverContext = MySQLDriver{}
_ driver.Connector = (*galeraAwareConnector)(nil)
)
178 changes: 178 additions & 0 deletions pkg/driver/mysql_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
package driver

import (
"context"
"database/sql/driver"
"github.com/go-sql-driver/mysql"
"github.com/stretchr/testify/assert"
"io"
"os"
"testing"
)

func TestSetGaleraOpts(t *testing.T) {
tolerated := &mysql.MySQLError{
Number: errUnknownSysVar.Number,
SQLState: [5]byte{255, 0, 42, 23, 7}, // Shall not confuse error comparison
Message: "This unusual text shall not confuse error comparison.",
}

almostTolerated := &mysql.MySQLError{}
*almostTolerated = *tolerated
almostTolerated.Number--

notTolerated := io.EOF
ignoredCodeLocation := os.ErrPermission

subtests := []struct {
name string
input testConn
output error
}{{
name: "Conn PrepareContext returns error",
input: testConn{prepareError: notTolerated},
output: notTolerated,
}, {
name: "Conn PrepareContext returns MySQLError",
input: testConn{prepareError: almostTolerated},
output: almostTolerated,
}, {
name: "Conn PrepareContext returns MySQLError 1193",
input: testConn{prepareError: tolerated},
output: nil,
}, {
name: "Stmt ExecContext returns error",
input: testConn{preparedStmt: &testStmt{
execError: notTolerated,
}},
output: notTolerated,
}, {
name: "Stmt ExecContext and Stmt Close return error",
input: testConn{preparedStmt: &testStmt{
execError: notTolerated,
closeError: ignoredCodeLocation,
}},
output: notTolerated,
}, {
name: "Stmt ExecContext returns MySQLError",
input: testConn{preparedStmt: &testStmt{
execError: almostTolerated,
}},
output: almostTolerated,
}, {
name: "Stmt ExecContext returns MySQLError and Stmt Close returns error",
input: testConn{preparedStmt: &testStmt{
execError: almostTolerated,
closeError: ignoredCodeLocation,
}},
output: almostTolerated,
}, {
name: "Stmt ExecContext returns MySQLError 1193",
input: testConn{preparedStmt: &testStmt{
execError: tolerated,
}},
output: nil,
}, {
name: "Stmt ExecContext and Stmt Close return MySQLError 1193",
input: testConn{preparedStmt: &testStmt{
execError: tolerated,
closeError: tolerated,
}},
output: tolerated,
}, {
name: "Stmt Close returns MySQLError 1193",
input: testConn{preparedStmt: &testStmt{
execResult: driver.ResultNoRows,
closeError: tolerated,
}},
output: tolerated,
}, {
name: "no errors",
input: testConn{preparedStmt: &testStmt{
execResult: driver.ResultNoRows,
}},
output: nil,
}}

for _, st := range subtests {
t.Run(st.name, func(t *testing.T) {
assert.ErrorIs(t, setGaleraOpts(context.Background(), &st.input, 7), st.output)
assert.GreaterOrEqual(t, st.input.prepareCalls, uint8(1))

if ts, ok := st.input.preparedStmt.(*testStmt); ok {
assert.GreaterOrEqual(t, ts.execCalls, st.input.prepareCalls)
assert.GreaterOrEqual(t, ts.closeCalls, st.input.prepareCalls)
}
})
}
}

type testStmt struct {
execResult driver.Result
execError error
execCalls uint8
closeError error
closeCalls uint8
}

// Close implements the driver.Stmt interface.
func (ts *testStmt) Close() error {
ts.closeCalls++
return ts.closeError
}

// NumInput implements the driver.Stmt interface.
func (*testStmt) NumInput() int {
panic("don't call me")
}

// Exec implements the driver.Stmt interface.
func (*testStmt) Exec([]driver.Value) (driver.Result, error) {
panic("don't call me")
}

// Query implements the driver.Stmt interface.
func (*testStmt) Query([]driver.Value) (driver.Rows, error) {
panic("don't call me")
}

// ExecContext implements the driver.StmtExecContext interface.
func (ts *testStmt) ExecContext(context.Context, []driver.NamedValue) (driver.Result, error) {
ts.execCalls++
return ts.execResult, ts.execError
}

type testConn struct {
preparedStmt driver.Stmt
prepareError error
prepareCalls uint8
}

// Prepare implements the driver.Conn interface.
func (*testConn) Prepare(string) (driver.Stmt, error) {
panic("don't call me")
}

// Close implements the driver.Conn interface.
func (*testConn) Close() error {
panic("don't call me")
}

// Begin implements the driver.Conn interface.
func (*testConn) Begin() (driver.Tx, error) {
panic("don't call me")
}

// PrepareContext implements the driver.ConnPrepareContext interface.
func (tc *testConn) PrepareContext(context.Context, string) (driver.Stmt, error) {
tc.prepareCalls++
return tc.preparedStmt, tc.prepareError
}

// Assert interface compliance.
var (
_ driver.Conn = (*testConn)(nil)
_ driver.ConnPrepareContext = (*testConn)(nil)
_ driver.Stmt = (*testStmt)(nil)
_ driver.StmtExecContext = (*testStmt)(nil)
)
4 changes: 4 additions & 0 deletions pkg/icingadb/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ type Options struct {
// MaxRowsPerTransaction defines the maximum number of rows per transaction.
// The default is 2^13, which in our tests showed the best performance in terms of execution time and parallelism.
MaxRowsPerTransaction int `yaml:"max_rows_per_transaction" default:"8192"`

// WsrepSyncWait defines which kinds of SQL statements catch up all pending sync between nodes first, see:
// https://mariadb.com/kb/en/galera-cluster-system-variables/#wsrep_sync_wait
WsrepSyncWait int `yaml:"wsrep_sync_wait" default:"7"`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should also be documented in a place that users can easily find. The current example config does not contain an options block. This could perhaps be added, as for many people a look at the default config is a starting point.

Textual documentation would also be helpful. Perhaps also referring to the HA area?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current example config does not contain an options block.

That's exactly my reason for not documenting it (yet).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reasoning for the existing options was that those those should not need to be changed by users. The default values should work fine everywhere, they were merely exposed to the config so that we might try to tweak them even in some production environment if problems would arise (but so far, those defaults seem to have worked out fine).

}

// Validate checks constraints in the supplied database options and returns an error if they are violated.
Expand Down
Loading