Skip to content

Commit

Permalink
Add stop on errors option
Browse files Browse the repository at this point in the history
  • Loading branch information
hmoragrega committed Mar 1, 2021
1 parent 966a90d commit c29cced
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 27 deletions.
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ type Config struct {
// Initial indicates the initial number of workers that should be running.
// The default value will be the greater number between 1 or the given minimum.
Initial int

// StopOnErrors indicates whether the pool should stop when job a returns an error.
StopOnErrors bool
}
```

Expand Down
57 changes: 41 additions & 16 deletions pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,9 @@ type Config struct {
// Initial indicates the initial number of workers that should be running.
// The default value will be the greater number between 1 or the given minimum.
Initial int

// StopOnErrors indicates whether the pool should stop when job a returns an error.
StopOnErrors bool
}

// New creates a new pool with the default configuration.
Expand Down Expand Up @@ -146,10 +149,11 @@ func NewWithConfig(cfg Config, middlewares ...Middleware) (*Pool, error) {
}

return &Pool{
min: cfg.Min,
max: cfg.Max,
initial: cfg.Initial,
mws: middlewares,
min: cfg.Min,
max: cfg.Max,
initial: cfg.Initial,
stopOnErrors: cfg.StopOnErrors,
mws: middlewares,
}, nil
}

Expand Down Expand Up @@ -181,9 +185,10 @@ func newDefault(middlewares ...Middleware) *Pool {
// to run a job non-stop concurrently.
type Pool struct {
// config
min int
initial int
max int
min int
initial int
max int
stopOnErrors bool

// job and its workers.
jobBuilder JobBuilder
Expand All @@ -210,6 +215,10 @@ type Pool struct {
// has finished in a clean way.
done chan struct{}

// will contain the error that trigger
// the stop when "stop on errors" is true
workerErr error

mx sync.RWMutex
}

Expand Down Expand Up @@ -245,7 +254,10 @@ func (p *Pool) RunWithBuilder(ctx context.Context, jobBuilder JobBuilder) error
return err
}

<-ctx.Done()
select {
case <-ctx.Done():
case <-p.ctx.Done():
}

return p.Close(context.Background())
}
Expand Down Expand Up @@ -278,8 +290,8 @@ func (p *Pool) start(jobBuilder JobBuilder) error {
wg.Add(initial)
for i := 0; i < initial; i++ {
go func(i int) {
defer wg.Done()
p.workers[i] = p.newWorker()
wg.Done()
}(i)
}

Expand Down Expand Up @@ -399,7 +411,7 @@ func (p *Pool) Close(ctx context.Context) error {
case <-ctx.Done():
return ctx.Err()
case <-p.done:
return nil
return p.workerErr
}
}

Expand Down Expand Up @@ -436,31 +448,44 @@ func (p *Pool) CloseWithTimeout(timeout time.Duration) error {
func (p *Pool) newWorker() *worker {
ctx, cancel := context.WithCancel(p.ctx)
w := &worker{
cancel: cancel,
cancel: cancel,
stopOnErrors: p.stopOnErrors,
}

p.running <- 1
go func() {
defer func() {
p.running <- -1
}()
w.work(ctx, Wrap(p.jobBuilder.New(), p.mws...))
err := w.work(ctx, Wrap(p.jobBuilder.New(), p.mws...))
if err != nil {
p.mx.Lock()
if p.workerErr == nil {
p.workerErr = err
}
p.mx.Unlock()
p.cancel()
}
}()

return w
}

type worker struct {
cancel func()
cancel func()
stopOnErrors bool
}

func (w *worker) work(ctx context.Context, job Job) {
func (w *worker) work(ctx context.Context, job Job) error {
for {
select {
case <-ctx.Done():
return
return nil
default:
_ = job.Do(ctx)
err := job.Do(ctx)
if err != nil && !errors.Is(err, context.Canceled) && w.stopOnErrors {
return err
}
}
}
}
47 changes: 36 additions & 11 deletions pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package workers
import (
"context"
"errors"
"fmt"
"math"
"math/rand"
"sync"
Expand Down Expand Up @@ -229,6 +230,23 @@ func TestPool_Run(t *testing.T) {
}
}

func TestPool_RunStopOnErrors(t *testing.T) {
pool, err := NewWithConfig(Config{StopOnErrors: true})
if err != nil {
t.Fatal("cannot build the pool", err)
}

dummyError := errors.New("some error")
job := JobFunc(func(ctx context.Context) error {
return dummyError
})

err = pool.Run(context.Background(), job)
if !errors.Is(err, dummyError) {
t.Fatalf("unexpected error running the pool, want %v got %v", dummyError, err)
}
}

func TestPool_More(t *testing.T) {
testCases := []struct {
name string
Expand Down Expand Up @@ -462,29 +480,36 @@ func TestPool_ConcurrencySafety(t *testing.T) {
t.Fatal("cannot start pool", err)
}

var wg sync.WaitGroup
wg.Add(2)
errs := make(chan error, 2)

go func() {
go func(t *testing.T) {
for i := 0; i < rand.Intn(total); i++ {
if err := p.More(); err != nil {
t.Fatal("cannot add worker", err)
errs <- fmt.Errorf("cannot add worker: %v", err)
return
}
}
wg.Done()
}()
errs <- nil
}(t)

go func() {
go func(t *testing.T) {
<-startRemoving
for i := 0; i < rand.Intn(total); i++ {
if err := p.Less(); err != nil && !errors.Is(err, ErrMinReached) {
t.Fatal("cannot remove worker", err)
errs <- fmt.Errorf("cannot remove worker: %v", err)
return
}
}
wg.Done()
}()
errs <- nil
}(t)

for i := 0; i < 2; i++ {
err := <-errs
if err != nil {
t.Fatal(err)
}
}

wg.Wait()
if err := p.Close(ctx); err != nil {
t.Fatal("cannot close pool", err)
}
Expand Down

0 comments on commit c29cced

Please sign in to comment.