Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: added optional handler for unknown job types #227

Merged
merged 1 commit into from
Oct 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ type Worker struct {
tracer trace.Tracer
meter metric.Meter

unknownJobTypeWF WorkFunc

hooksJobLocked []HookFunc
hooksUnknownJobType []HookFunc
hooksJobDone []HookFunc
Expand Down Expand Up @@ -229,8 +231,12 @@ func (w *Worker) WorkOne(ctx context.Context) (didWork bool) {

wf, ok := w.wm[j.Type]
if !ok {
w.handleUnknownJobType(ctx, j, span, ll)
return
if w.unknownJobTypeWF == nil {
w.handleUnknownJobType(ctx, j, span, ll)
return
}

wf = w.unknownJobTypeWF
}

handlerCtx := ctx
Expand Down Expand Up @@ -378,6 +384,8 @@ type WorkerPool struct {
tracer trace.Tracer
meter metric.Meter

unknownJobTypeWF WorkFunc

hooksJobLocked []HookFunc
hooksUnknownJobType []HookFunc
hooksJobDone []HookFunc
Expand Down Expand Up @@ -433,6 +441,7 @@ func NewWorkerPool(c *Client, wm WorkMap, poolSize int, options ...WorkerPoolOpt
WithWorkerPanicStackBufSize(w.panicStackBufSize),
WithWorkerSpanWorkOneNoJob(w.spanWorkOneNoJob),
WithWorkerJobTTL(w.jobTTL),
WithWorkerUnknownJobWorkFunc(w.unknownJobTypeWF),
)

if err != nil {
Expand Down
22 changes: 22 additions & 0 deletions worker_option.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ func WithWorkerHooksJobLocked(hooks ...HookFunc) WorkerOption {
// WithWorkerHooksUnknownJobType sets hooks that are called when worker finds a job with unknown type.
// Error field for this event type is always set since this is an error situation.
// If this hook is called - no other lifecycle hooks will be called for the job.
// When the handler for unknown job types is set with WithWorkerUnknownJobWorkFunc - these hooks are never called
// as the job is handled in the regular way using that handler.
func WithWorkerHooksUnknownJobType(hooks ...HookFunc) WorkerOption {
return func(w *Worker) {
w.hooksUnknownJobType = hooks
Expand Down Expand Up @@ -145,6 +147,15 @@ func WithWorkerJobTTL(d time.Duration) WorkerOption {
}
}

// WithWorkerUnknownJobWorkFunc sets the handler for unknown job types.
// When the handler is set - hooks set with WithWorkerHooksUnknownJobType are never called as the job is
// handled in the regular way.
func WithWorkerUnknownJobWorkFunc(wf WorkFunc) WorkerOption {
return func(w *Worker) {
w.unknownJobTypeWF = wf
}
}

// WithPoolPollInterval overrides default poll interval with the given value.
// Poll interval is the "sleep" duration if there were no jobs found in the DB.
func WithPoolPollInterval(d time.Duration) WorkerPoolOption {
Expand Down Expand Up @@ -203,6 +214,8 @@ func WithPoolHooksJobLocked(hooks ...HookFunc) WorkerPoolOption {
}

// WithPoolHooksUnknownJobType calls WithWorkerHooksUnknownJobType for every worker in the pool.
// When the handler for unknown job types is set with WithPoolUnknownJobWorkFunc - these hooks are never called
// as the job is handled in the regular way using that handler.
func WithPoolHooksUnknownJobType(hooks ...HookFunc) WorkerPoolOption {
return func(w *WorkerPool) {
w.hooksUnknownJobType = hooks
Expand Down Expand Up @@ -257,3 +270,12 @@ func WithPoolJobTTL(d time.Duration) WorkerPoolOption {
w.jobTTL = d
}
}

// WithPoolUnknownJobWorkFunc sets the handler for unknown job types.
// When the handler is set - hooks set with WithPoolHooksUnknownJobType are never called as the job is
// handled in the regular way.
func WithPoolUnknownJobWorkFunc(wf WorkFunc) WorkerPoolOption {
return func(w *WorkerPool) {
w.unknownJobTypeWF = wf
}
}
48 changes: 48 additions & 0 deletions worker_option_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,27 @@ func TestWithWorkerPanicStackBufSize(t *testing.T) {
assert.Equal(t, 1234, workerWithCustomSize.panicStackBufSize)
}

func TestWithWorkerUnknownJobWorkFunc(t *testing.T) {
workerWithoutHandler, err := NewWorker(nil, dummyWM)
require.NoError(t, err)
assert.Nil(t, workerWithoutHandler.unknownJobTypeWF)

var wfCalled int
wf := WorkFunc(func(ctx context.Context, j *Job) error {
wfCalled++
return nil
})

workerWithHandler, err := NewWorker(nil, dummyWM, WithWorkerUnknownJobWorkFunc(wf))
require.NoError(t, err)
require.NotNil(t, workerWithHandler.unknownJobTypeWF)

assert.Equal(t, 0, wfCalled)
err = workerWithHandler.unknownJobTypeWF(nil, nil)
assert.Equal(t, 1, wfCalled)
assert.NoError(t, err)
}

func TestWithPoolPollInterval(t *testing.T) {
workerPoolWithDefaultInterval, err := NewWorkerPool(nil, dummyWM, 2)
require.NoError(t, err)
Expand Down Expand Up @@ -484,3 +505,30 @@ func TestWithPoolJobTTL(t *testing.T) {
assert.Equal(t, 10*time.Minute, w.jobTTL)
}
}

func TestWithPoolUnknownJobWorkFunc(t *testing.T) {
poolWithoutHandler, err := NewWorkerPool(nil, dummyWM, 2)
require.NoError(t, err)
for _, w := range poolWithoutHandler.workers {
assert.Nil(t, w.unknownJobTypeWF)
}

var wfCalled int
wf := WorkFunc(func(ctx context.Context, j *Job) error {
wfCalled++
return nil
})

poolWithHandler, err := NewWorkerPool(nil, dummyWM, 2, WithPoolUnknownJobWorkFunc(wf))
require.NoError(t, err)
require.NotNil(t, poolWithHandler.unknownJobTypeWF)

assert.Equal(t, 0, wfCalled)
for _, w := range poolWithHandler.workers {
require.NotNil(t, w.unknownJobTypeWF)

err := w.unknownJobTypeWF(nil, nil)
assert.NoError(t, err)
}
assert.Equal(t, 2, wfCalled)
}
47 changes: 47 additions & 0 deletions worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"testing"
"time"

"github.com/oklog/ulid/v2"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
Expand Down Expand Up @@ -486,6 +487,52 @@ func testWorkerWorkOneTypeNotInMap(t *testing.T, connPool adapter.ConnPool) {
assert.Contains(t, j.LastError.String, `unknown job type: "MyJob"`)
}

func TestWorkerWorkOneUnknownTypeWM(t *testing.T) {
for name, openFunc := range adapterTesting.AllAdaptersOpenTestPool {
t.Run(name, func(t *testing.T) {
testWorkerWorkOneUnknownTypeWM(t, openFunc(t))
})
}
}

func testWorkerWorkOneUnknownTypeWM(t *testing.T, connPool adapter.ConnPool) {
ctx := context.Background()

c, err := NewClient(connPool)
require.NoError(t, err)

wm := WorkMap{}

var unknownWFCalled int
unknownJobTypeHook := new(mockHook)

w, err := NewWorker(
c,
wm,
WithWorkerHooksUnknownJobType(unknownJobTypeHook.handler),
WithWorkerUnknownJobWorkFunc(func(ctx context.Context, j *Job) error {
unknownWFCalled++
return nil
}),
)
require.NoError(t, err)

didWork := w.WorkOne(ctx)
assert.False(t, didWork)

assert.Equal(t, 0, unknownJobTypeHook.called)

// random job type, because we do not really care in this case
err = c.Enqueue(ctx, &Job{Type: ulid.MustNew(ulid.Now(), ulid.DefaultEntropy()).String()})
require.NoError(t, err)

didWork = w.WorkOne(ctx)
assert.True(t, didWork)

assert.Equal(t, 0, unknownJobTypeHook.called)
assert.Equal(t, 1, unknownWFCalled)
}

// TestWorker_WorkOne_errorHookTx tests that JobDone hooks are running in the same transaction as the errored job
func TestWorker_WorkOneErrorHookTx(t *testing.T) {
for name, openFunc := range adapterTesting.AllAdaptersOpenTestPool {
Expand Down
Loading