Skip to content

Commit

Permalink
[Stream] Rm unused var, fix lint error
Browse files Browse the repository at this point in the history
  • Loading branch information
IvoGoman committed May 18, 2022
1 parent a417c79 commit 335a08c
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 19 deletions.
31 changes: 13 additions & 18 deletions pkg/storage/mariadb.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func NewMariaDBStream(c config.MariaDBStream, serviceName string) (m *MariaDBStr
databases: databases,
sqlParser: sqlParser,
serviceName: serviceName,
retry: newRetryHandler(db, &c, serviceName),
retry: newRetryHandler(db, &c),
tableMetadata: tableMetadata,
}, nil

Expand All @@ -134,11 +134,10 @@ func (m *MariaDBStream) WriteStream(fileName, mimeType string, body io.Reader, t
func (m *MariaDBStream) WriteChannel(name, mimeType string, body <-chan StreamEvent, tags map[string]string, dlo bool) (err error) {
ctx := context.Background()

defer func() error {
if err := m.retry.commit(ctx); err != nil {
return fmt.Errorf("error committing transaction: %s", err.Error())
defer func() {
if err = m.retry.commit(ctx); err != nil {
err = fmt.Errorf("error committing transaction: %s", err.Error())
}
return nil
}()

for {
Expand Down Expand Up @@ -338,10 +337,8 @@ func (m *MariaDBStream) replicateQuery(ctx context.Context, query string, schema
if err != nil {
switch err := errors.Cause(err).(type) {
case *mysql.MySQLError:
if err.Number == 1049 {
// Unknown database, unset DB. This can be a `create database` query
m.retry.execContext(ctx, "use dummy;", nil)
} else {
// Unknown database == 1049. This can be a `create database` query
if err.Number != 1049 {
return fmt.Errorf("cannot change schema: %v", err.Error())
}
}
Expand Down Expand Up @@ -641,9 +638,8 @@ func cleanupLocks(db *sql.DB, cfg *config.MariaDBStream) (removedLocks int, err
if errors.Is(err, sql.ErrNoRows) {
log.Info("cannot cleanup locks. 'METADATA_LOCK_INFO' plugin is not installed")
return 0, nil
} else {
return 0, err
}
return 0, err
}

if lockInfoActive != "ACTIVE" {
Expand Down Expand Up @@ -1024,14 +1020,13 @@ func isRetryable(err error) bool {
}

type retryHandler struct {
db *sql.DB
cfg *config.MariaDBStream
tx *sql.Tx
queries []myQuery
serviceName string
db *sql.DB
cfg *config.MariaDBStream
tx *sql.Tx
queries []myQuery
}

func newRetryHandler(db *sql.DB, cfg *config.MariaDBStream, serviceName string) (rh *retryHandler) {
func newRetryHandler(db *sql.DB, cfg *config.MariaDBStream) (rh *retryHandler) {
return &retryHandler{db: db, cfg: cfg, tx: nil}
}

Expand Down Expand Up @@ -1099,7 +1094,7 @@ func (r *retryHandler) execContext(ctx context.Context, query string, args []int
if err == nil {
return
}
r.tx.Rollback()
_ = r.tx.Rollback()

if isRetryable(err) {
if err = r.retryTx(ctx); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/mariadb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -637,7 +637,7 @@ func setup(t *testing.T, parseSQL, hasRowMetadata bool, hasPrimaryKey bool) (mar
ParseSchema: parseSQL,
}
mock.ExpectBegin()
retryHandler := newRetryHandler(db, &config, "test")
retryHandler := newRetryHandler(db, &config)
if err := retryHandler.beginTx(context.TODO()); err != nil {
t.Error("test setup failed to create transaction")
}
Expand Down

0 comments on commit 335a08c

Please sign in to comment.