Skip to content

Commit

Permalink
Merge pull request #6 from agnivade/feature
Browse files Browse the repository at this point in the history
Optimize completion signal and add benchmarks
  • Loading branch information
ivpusic authored Aug 4, 2017
2 parents 3d8061b + 72ce568 commit 28957a2
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 20 deletions.
36 changes: 16 additions & 20 deletions grpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import "sync"
type worker struct {
workerPool chan *worker
jobChannel chan Job
stop chan bool
stop chan struct{}
}

func (w *worker) start() {
Expand All @@ -19,11 +19,9 @@ func (w *worker) start() {
select {
case job = <-w.jobChannel:
job()
case stop := <-w.stop:
if stop {
w.stop <- true
return
}
case <-w.stop:
w.stop <- struct{}{}
return
}
}
}()
Expand All @@ -33,15 +31,15 @@ func newWorker(pool chan *worker) *worker {
return &worker{
workerPool: pool,
jobChannel: make(chan Job),
stop: make(chan bool),
stop: make(chan struct{}),
}
}

// Accepts jobs from clients, and waits for first free worker to deliver job
type dispatcher struct {
workerPool chan *worker
jobQueue chan Job
stop chan bool
stop chan struct{}
}

func (d *dispatcher) dispatch() {
Expand All @@ -50,18 +48,16 @@ func (d *dispatcher) dispatch() {
case job := <-d.jobQueue:
worker := <-d.workerPool
worker.jobChannel <- job
case stop := <-d.stop:
if stop {
for i := 0; i < cap(d.workerPool); i++ {
worker := <-d.workerPool

worker.stop <- true
<-worker.stop
}
case <-d.stop:
for i := 0; i < cap(d.workerPool); i++ {
worker := <-d.workerPool

d.stop <- true
return
worker.stop <- struct{}{}
<-worker.stop
}

d.stop <- struct{}{}
return
}
}
}
Expand All @@ -70,7 +66,7 @@ func newDispatcher(workerPool chan *worker, jobQueue chan Job) *dispatcher {
d := &dispatcher{
workerPool: workerPool,
jobQueue: jobQueue,
stop: make(chan bool),
stop: make(chan struct{}),
}

for i := 0; i < cap(d.workerPool); i++ {
Expand Down Expand Up @@ -129,6 +125,6 @@ func (p *Pool) WaitAll() {

// Will release resources used by pool
func (p *Pool) Release() {
p.dispatcher.stop <- true
p.dispatcher.stop <- struct{}{}
<-p.dispatcher.stop
}
17 changes: 17 additions & 0 deletions grpool_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package grpool

import (
"io/ioutil"
"log"
"runtime"
"sync/atomic"
"testing"
Expand Down Expand Up @@ -84,3 +86,18 @@ func TestRelease(t *testing.T) {

pool.WaitAll()
}

func BenchmarkPool(b *testing.B) {
// Testing with just 1 goroutine
// to benchmark the non-parallel part of the code
pool := NewPool(1, 10)
defer pool.Release()

log.SetOutput(ioutil.Discard)

for n := 0; n < b.N; n++ {
pool.JobQueue <- func() {
log.Printf("I am worker! Number %d\n", n)
}
}
}

0 comments on commit 28957a2

Please sign in to comment.