diff --git a/pkg/storage/mariadb.go b/pkg/storage/mariadb.go index 4a66f953..1a22816e 100644 --- a/pkg/storage/mariadb.go +++ b/pkg/storage/mariadb.go @@ -114,7 +114,7 @@ func NewMariaDBStream(c config.MariaDBStream, serviceName string) (m *MariaDBStr databases: databases, sqlParser: sqlParser, serviceName: serviceName, - retry: newRetryHandler(db, &c, serviceName), + retry: newRetryHandler(db, &c), tableMetadata: tableMetadata, }, nil @@ -134,11 +134,10 @@ func (m *MariaDBStream) WriteStream(fileName, mimeType string, body io.Reader, t func (m *MariaDBStream) WriteChannel(name, mimeType string, body <-chan StreamEvent, tags map[string]string, dlo bool) (err error) { ctx := context.Background() - defer func() error { - if err := m.retry.commit(ctx); err != nil { - return fmt.Errorf("error committing transaction: %s", err.Error()) + defer func() { + if err = m.retry.commit(ctx); err != nil { + err = fmt.Errorf("error committing transaction: %s", err.Error()) } - return nil }() for { @@ -338,10 +337,8 @@ func (m *MariaDBStream) replicateQuery(ctx context.Context, query string, schema if err != nil { switch err := errors.Cause(err).(type) { case *mysql.MySQLError: - if err.Number == 1049 { - // Unknown database, unset DB. This can be a `create database` query - m.retry.execContext(ctx, "use dummy;", nil) - } else { + // Unknown database == 1049. This can be a `create database` query + if err.Number != 1049 { return fmt.Errorf("cannot change schema: %v", err.Error()) } } @@ -641,9 +638,8 @@ func cleanupLocks(db *sql.DB, cfg *config.MariaDBStream) (removedLocks int, err if errors.Is(err, sql.ErrNoRows) { log.Info("cannot cleanup locks. 'METADATA_LOCK_INFO' plugin is not installed") return 0, nil - } else { - return 0, err } + return 0, err } if lockInfoActive != "ACTIVE" { @@ -1024,14 +1020,13 @@ func isRetryable(err error) bool { } type retryHandler struct { - db *sql.DB - cfg *config.MariaDBStream - tx *sql.Tx - queries []myQuery - serviceName string + db *sql.DB + cfg *config.MariaDBStream + tx *sql.Tx + queries []myQuery } -func newRetryHandler(db *sql.DB, cfg *config.MariaDBStream, serviceName string) (rh *retryHandler) { +func newRetryHandler(db *sql.DB, cfg *config.MariaDBStream) (rh *retryHandler) { return &retryHandler{db: db, cfg: cfg, tx: nil} } @@ -1099,7 +1094,7 @@ func (r *retryHandler) execContext(ctx context.Context, query string, args []int if err == nil { return } - r.tx.Rollback() + _ = r.tx.Rollback() if isRetryable(err) { if err = r.retryTx(ctx); err != nil { diff --git a/pkg/storage/mariadb_test.go b/pkg/storage/mariadb_test.go index b60d1d79..87d182d4 100644 --- a/pkg/storage/mariadb_test.go +++ b/pkg/storage/mariadb_test.go @@ -637,7 +637,7 @@ func setup(t *testing.T, parseSQL, hasRowMetadata bool, hasPrimaryKey bool) (mar ParseSchema: parseSQL, } mock.ExpectBegin() - retryHandler := newRetryHandler(db, &config, "test") + retryHandler := newRetryHandler(db, &config) if err := retryHandler.beginTx(context.TODO()); err != nil { t.Error("test setup failed to create transaction") }