Skip to content

Commit

Permalink
feat: added optional handler for unknown job types
Browse files Browse the repository at this point in the history
  • Loading branch information
vgarvardt committed Oct 14, 2023
1 parent c120d80 commit 9df4ed3
Show file tree
Hide file tree
Showing 4 changed files with 128 additions and 2 deletions.
13 changes: 11 additions & 2 deletions worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ type Worker struct {
tracer trace.Tracer
meter metric.Meter

unknownJobTypeWF WorkFunc

hooksJobLocked []HookFunc
hooksUnknownJobType []HookFunc
hooksJobDone []HookFunc
Expand Down Expand Up @@ -229,8 +231,12 @@ func (w *Worker) WorkOne(ctx context.Context) (didWork bool) {

wf, ok := w.wm[j.Type]
if !ok {
w.handleUnknownJobType(ctx, j, span, ll)
return
if w.unknownJobTypeWF == nil {
w.handleUnknownJobType(ctx, j, span, ll)
return
}

wf = w.unknownJobTypeWF
}

handlerCtx := ctx
Expand Down Expand Up @@ -378,6 +384,8 @@ type WorkerPool struct {
tracer trace.Tracer
meter metric.Meter

unknownJobTypeWF WorkFunc

hooksJobLocked []HookFunc
hooksUnknownJobType []HookFunc
hooksJobDone []HookFunc
Expand Down Expand Up @@ -433,6 +441,7 @@ func NewWorkerPool(c *Client, wm WorkMap, poolSize int, options ...WorkerPoolOpt
WithWorkerPanicStackBufSize(w.panicStackBufSize),
WithWorkerSpanWorkOneNoJob(w.spanWorkOneNoJob),
WithWorkerJobTTL(w.jobTTL),
WithWorkerUnknownJobWorkFunc(w.unknownJobTypeWF),
)

if err != nil {
Expand Down
22 changes: 22 additions & 0 deletions worker_option.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ func WithWorkerHooksJobLocked(hooks ...HookFunc) WorkerOption {
// WithWorkerHooksUnknownJobType sets hooks that are called when worker finds a job with unknown type.
// Error field for this event type is always set since this is an error situation.
// If this hook is called - no other lifecycle hooks will be called for the job.
// When the handler for unknown job types is set with WithWorkerUnknownJobWorkFunc - these hooks are never called
// as the job is handled in the regular way using that handler.
func WithWorkerHooksUnknownJobType(hooks ...HookFunc) WorkerOption {
return func(w *Worker) {
w.hooksUnknownJobType = hooks
Expand Down Expand Up @@ -145,6 +147,15 @@ func WithWorkerJobTTL(d time.Duration) WorkerOption {
}
}

// WithWorkerUnknownJobWorkFunc sets the handler for unknown job types.
// When the handler is set - hooks set with WithWorkerHooksUnknownJobType are never called as the job is
// handled in the regular way.
func WithWorkerUnknownJobWorkFunc(wf WorkFunc) WorkerOption {
return func(w *Worker) {
w.unknownJobTypeWF = wf
}
}

// WithPoolPollInterval overrides default poll interval with the given value.
// Poll interval is the "sleep" duration if there were no jobs found in the DB.
func WithPoolPollInterval(d time.Duration) WorkerPoolOption {
Expand Down Expand Up @@ -203,6 +214,8 @@ func WithPoolHooksJobLocked(hooks ...HookFunc) WorkerPoolOption {
}

// WithPoolHooksUnknownJobType calls WithWorkerHooksUnknownJobType for every worker in the pool.
// When the handler for unknown job types is set with WithPoolUnknownJobWorkFunc - these hooks are never called
// as the job is handled in the regular way using that handler.
func WithPoolHooksUnknownJobType(hooks ...HookFunc) WorkerPoolOption {
return func(w *WorkerPool) {
w.hooksUnknownJobType = hooks
Expand Down Expand Up @@ -257,3 +270,12 @@ func WithPoolJobTTL(d time.Duration) WorkerPoolOption {
w.jobTTL = d
}
}

// WithPoolUnknownJobWorkFunc sets the handler for unknown job types.
// When the handler is set - hooks set with WithPoolHooksUnknownJobType are never called as the job is
// handled in the regular way.
func WithPoolUnknownJobWorkFunc(wf WorkFunc) WorkerPoolOption {
return func(w *WorkerPool) {
w.unknownJobTypeWF = wf
}
}
48 changes: 48 additions & 0 deletions worker_option_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,27 @@ func TestWithWorkerPanicStackBufSize(t *testing.T) {
assert.Equal(t, 1234, workerWithCustomSize.panicStackBufSize)
}

func TestWithWorkerUnknownJobWorkFunc(t *testing.T) {
workerWithoutHandler, err := NewWorker(nil, dummyWM)
require.NoError(t, err)
assert.Nil(t, workerWithoutHandler.unknownJobTypeWF)

var wfCalled int
wf := WorkFunc(func(ctx context.Context, j *Job) error {
wfCalled++
return nil
})

workerWithHandler, err := NewWorker(nil, dummyWM, WithWorkerUnknownJobWorkFunc(wf))
require.NoError(t, err)
require.NotNil(t, workerWithHandler.unknownJobTypeWF)

assert.Equal(t, 0, wfCalled)
err = workerWithHandler.unknownJobTypeWF(nil, nil)
assert.Equal(t, 1, wfCalled)
assert.NoError(t, err)
}

func TestWithPoolPollInterval(t *testing.T) {
workerPoolWithDefaultInterval, err := NewWorkerPool(nil, dummyWM, 2)
require.NoError(t, err)
Expand Down Expand Up @@ -484,3 +505,30 @@ func TestWithPoolJobTTL(t *testing.T) {
assert.Equal(t, 10*time.Minute, w.jobTTL)
}
}

func TestWithPoolUnknownJobWorkFunc(t *testing.T) {
poolWithoutHandler, err := NewWorkerPool(nil, dummyWM, 2)
require.NoError(t, err)
for _, w := range poolWithoutHandler.workers {
assert.Nil(t, w.unknownJobTypeWF)
}

var wfCalled int
wf := WorkFunc(func(ctx context.Context, j *Job) error {
wfCalled++
return nil
})

poolWithHandler, err := NewWorkerPool(nil, dummyWM, 2, WithPoolUnknownJobWorkFunc(wf))
require.NoError(t, err)
require.NotNil(t, poolWithHandler.unknownJobTypeWF)

assert.Equal(t, 0, wfCalled)
for _, w := range poolWithHandler.workers {
require.NotNil(t, w.unknownJobTypeWF)

err := w.unknownJobTypeWF(nil, nil)
assert.NoError(t, err)
}
assert.Equal(t, 2, wfCalled)
}
47 changes: 47 additions & 0 deletions worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"testing"
"time"

"github.com/oklog/ulid/v2"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
Expand Down Expand Up @@ -486,6 +487,52 @@ func testWorkerWorkOneTypeNotInMap(t *testing.T, connPool adapter.ConnPool) {
assert.Contains(t, j.LastError.String, `unknown job type: "MyJob"`)
}

func TestWorkerWorkOneUnknownTypeWM(t *testing.T) {
for name, openFunc := range adapterTesting.AllAdaptersOpenTestPool {
t.Run(name, func(t *testing.T) {
testWorkerWorkOneUnknownTypeWM(t, openFunc(t))
})
}
}

func testWorkerWorkOneUnknownTypeWM(t *testing.T, connPool adapter.ConnPool) {
ctx := context.Background()

c, err := NewClient(connPool)
require.NoError(t, err)

wm := WorkMap{}

var unknownWFCalled int
unknownJobTypeHook := new(mockHook)

w, err := NewWorker(
c,
wm,
WithWorkerHooksUnknownJobType(unknownJobTypeHook.handler),
WithWorkerUnknownJobWorkFunc(func(ctx context.Context, j *Job) error {
unknownWFCalled++
return nil
}),
)
require.NoError(t, err)

didWork := w.WorkOne(ctx)
assert.False(t, didWork)

assert.Equal(t, 0, unknownJobTypeHook.called)

// random job type, because we do not really care in this case
err = c.Enqueue(ctx, &Job{Type: ulid.MustNew(ulid.Now(), ulid.DefaultEntropy()).String()})
require.NoError(t, err)

didWork = w.WorkOne(ctx)
assert.True(t, didWork)

assert.Equal(t, 0, unknownJobTypeHook.called)
assert.Equal(t, 1, unknownWFCalled)
}

// TestWorker_WorkOne_errorHookTx tests that JobDone hooks are running in the same transaction as the errored job
func TestWorker_WorkOneErrorHookTx(t *testing.T) {
for name, openFunc := range adapterTesting.AllAdaptersOpenTestPool {
Expand Down

0 comments on commit 9df4ed3

Please sign in to comment.