Skip to content

Commit

Permalink
feat: added optional handler for unknown job types
Browse files Browse the repository at this point in the history
  • Loading branch information
vgarvardt committed Oct 10, 2023
1 parent a2b1222 commit 906d4af
Show file tree
Hide file tree
Showing 4 changed files with 149 additions and 54 deletions.
53 changes: 32 additions & 21 deletions worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ type Worker struct {
tracer trace.Tracer
meter metric.Meter

unknownJobTypeWF WorkFunc

hooksJobLocked []HookFunc
hooksUnknownJobType []HookFunc
hooksJobDone []HookFunc
Expand Down Expand Up @@ -218,24 +220,8 @@ func (w *Worker) WorkOne(ctx context.Context) (didWork bool) {

ll := w.logger.With(adapter.F("job-id", j.ID.String()), adapter.F("job-type", j.Type))

defer func() {
if err := j.Done(ctx); err != nil {
span.RecordError(fmt.Errorf("failed to mark job as done: %w", err))
ll.Error("Failed to mark job as done", adapter.Err(err))

// let user handle critical job failure
for _, hook := range w.hooksJobUndone {
hook(ctx, j, err)
}
}

w.mDuration.Record(
ctx,
time.Since(processingStartedAt).Milliseconds(),
metric.WithAttributes(attrJobType.String(j.Type)),
)
}()
defer w.recoverPanic(ctx, ll, j)
defer w.markJobDone(ctx, j, processingStartedAt, span, ll)
defer w.recoverPanic(ctx, j, ll)

for _, hook := range w.hooksJobLocked {
hook(ctx, j, nil)
Expand All @@ -245,8 +231,12 @@ func (w *Worker) WorkOne(ctx context.Context) (didWork bool) {

wf, ok := w.wm[j.Type]
if !ok {
w.handleUnknownJobType(ctx, j, span, ll)
return
if w.unknownJobTypeWF == nil {
w.handleUnknownJobType(ctx, j, span, ll)
return
}

wf = w.unknownJobTypeWF
}

handlerCtx := ctx
Expand Down Expand Up @@ -323,9 +313,27 @@ func (w *Worker) initMetrics() (err error) {
return nil
}

func (w *Worker) markJobDone(ctx context.Context, j *Job, processingStartedAt time.Time, span trace.Span, ll adapter.Logger) {
if err := j.Done(ctx); err != nil {
span.RecordError(fmt.Errorf("failed to mark job as done: %w", err))
ll.Error("Failed to mark job as done", adapter.Err(err))

// let user handle critical job failure
for _, hook := range w.hooksJobUndone {
hook(ctx, j, err)
}

Check warning on line 324 in worker.go

View check run for this annotation

Codecov / codecov/patch

worker.go#L323-L324

Added lines #L323 - L324 were not covered by tests
}

w.mDuration.Record(
ctx,
time.Since(processingStartedAt).Milliseconds(),
metric.WithAttributes(attrJobType.String(j.Type)),
)
}

// recoverPanic tries to handle panics in job execution.
// A stacktrace is stored into Job last_error.
func (w *Worker) recoverPanic(ctx context.Context, logger adapter.Logger, j *Job) {
func (w *Worker) recoverPanic(ctx context.Context, j *Job, logger adapter.Logger) {
if r := recover(); r != nil {
ctx, span := w.tracer.Start(ctx, "Worker.recoverPanic")
defer span.End()
Expand Down Expand Up @@ -376,6 +384,8 @@ type WorkerPool struct {
tracer trace.Tracer
meter metric.Meter

unknownJobTypeWF WorkFunc

hooksJobLocked []HookFunc
hooksUnknownJobType []HookFunc
hooksJobDone []HookFunc
Expand Down Expand Up @@ -431,6 +441,7 @@ func NewWorkerPool(c *Client, wm WorkMap, poolSize int, options ...WorkerPoolOpt
WithWorkerPanicStackBufSize(w.panicStackBufSize),
WithWorkerSpanWorkOneNoJob(w.spanWorkOneNoJob),
WithWorkerJobTTL(w.jobTTL),
WithWorkerUnknownJobWorkFunc(w.unknownJobTypeWF),
)

if err != nil {
Expand Down
22 changes: 22 additions & 0 deletions worker_option.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ func WithWorkerHooksJobLocked(hooks ...HookFunc) WorkerOption {
// WithWorkerHooksUnknownJobType sets hooks that are called when worker finds a job with unknown type.
// Error field for this event type is always set since this is an error situation.
// If this hook is called - no other lifecycle hooks will be called for the job.
// When the handler for unknown job types is set with WithWorkerUnknownJobWorkFunc - these hooks are never called
// as the job is handled in the regular way using that handler.
func WithWorkerHooksUnknownJobType(hooks ...HookFunc) WorkerOption {
return func(w *Worker) {
w.hooksUnknownJobType = hooks
Expand Down Expand Up @@ -145,6 +147,15 @@ func WithWorkerJobTTL(d time.Duration) WorkerOption {
}
}

// WithWorkerUnknownJobWorkFunc sets the handler for unknown job types.
// When the handler is set - hooks set with WithWorkerHooksUnknownJobType are never called as the job is
// handled in the regular way.
func WithWorkerUnknownJobWorkFunc(wf WorkFunc) WorkerOption {
return func(w *Worker) {
w.unknownJobTypeWF = wf
}
}

// WithPoolPollInterval overrides default poll interval with the given value.
// Poll interval is the "sleep" duration if there were no jobs found in the DB.
func WithPoolPollInterval(d time.Duration) WorkerPoolOption {
Expand Down Expand Up @@ -203,6 +214,8 @@ func WithPoolHooksJobLocked(hooks ...HookFunc) WorkerPoolOption {
}

// WithPoolHooksUnknownJobType calls WithWorkerHooksUnknownJobType for every worker in the pool.
// When the handler for unknown job types is set with WithPoolUnknownJobWorkFunc - these hooks are never called
// as the job is handled in the regular way using that handler.
func WithPoolHooksUnknownJobType(hooks ...HookFunc) WorkerPoolOption {
return func(w *WorkerPool) {
w.hooksUnknownJobType = hooks
Expand Down Expand Up @@ -257,3 +270,12 @@ func WithPoolJobTTL(d time.Duration) WorkerPoolOption {
w.jobTTL = d
}
}

// WithPoolUnknownJobWorkFunc sets the handler for unknown job types.
// When the handler is set - hooks set with WithPoolHooksUnknownJobType are never called as the job is
// handled in the regular way.
func WithPoolUnknownJobWorkFunc(wf WorkFunc) WorkerPoolOption {
return func(w *WorkerPool) {
w.unknownJobTypeWF = wf
}
}
48 changes: 48 additions & 0 deletions worker_option_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,27 @@ func TestWithWorkerPanicStackBufSize(t *testing.T) {
assert.Equal(t, 1234, workerWithCustomSize.panicStackBufSize)
}

func TestWithWorkerUnknownJobWorkFunc(t *testing.T) {
workerWithoutHandler, err := NewWorker(nil, dummyWM)
require.NoError(t, err)
assert.Nil(t, workerWithoutHandler.unknownJobTypeWF)

var wfCalled int
wf := WorkFunc(func(ctx context.Context, j *Job) error {
wfCalled++
return nil
})

workerWithHandler, err := NewWorker(nil, dummyWM, WithWorkerUnknownJobWorkFunc(wf))
require.NoError(t, err)
require.NotNil(t, workerWithHandler.unknownJobTypeWF)

assert.Equal(t, 0, wfCalled)
err = workerWithHandler.unknownJobTypeWF(nil, nil)
assert.Equal(t, 1, wfCalled)
assert.NoError(t, err)
}

func TestWithPoolPollInterval(t *testing.T) {
workerPoolWithDefaultInterval, err := NewWorkerPool(nil, dummyWM, 2)
require.NoError(t, err)
Expand Down Expand Up @@ -484,3 +505,30 @@ func TestWithPoolJobTTL(t *testing.T) {
assert.Equal(t, 10*time.Minute, w.jobTTL)
}
}

func TestWithPoolUnknownJobWorkFunc(t *testing.T) {
poolWithoutHandler, err := NewWorkerPool(nil, dummyWM, 2)
require.NoError(t, err)
for _, w := range poolWithoutHandler.workers {
assert.Nil(t, w.unknownJobTypeWF)
}

var wfCalled int
wf := WorkFunc(func(ctx context.Context, j *Job) error {
wfCalled++
return nil
})

poolWithHandler, err := NewWorkerPool(nil, dummyWM, 2, WithPoolUnknownJobWorkFunc(wf))
require.NoError(t, err)
require.NotNil(t, poolWithHandler.unknownJobTypeWF)

assert.Equal(t, 0, wfCalled)
for _, w := range poolWithHandler.workers {
require.NotNil(t, w.unknownJobTypeWF)

err := w.unknownJobTypeWF(nil, nil)
assert.NoError(t, err)
}
assert.Equal(t, 2, wfCalled)
}
80 changes: 47 additions & 33 deletions worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"testing"
"time"

"github.com/oklog/ulid/v2"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
Expand Down Expand Up @@ -246,39 +247,6 @@ func testWorkerPoolWorkOne(t *testing.T, connPool adapter.ConnPool) {
assert.NoError(t, jobDoneHook.err)
}

func BenchmarkWorker(b *testing.B) {
for name, openFunc := range adapterTesting.AllAdaptersOpenTestPool {
b.Run(name, func(b *testing.B) {
benchmarkWorker(b, openFunc(b))
})
}
}

func benchmarkWorker(b *testing.B, connPool adapter.ConnPool) {
ctx := context.Background()

c, err := NewClient(connPool)
require.NoError(b, err)

w, err := NewWorker(c, WorkMap{"Nil": nilWorker})
require.NoError(b, err)

for i := 0; i < b.N; i++ {
if err := c.Enqueue(ctx, &Job{Type: "Nil"}); err != nil {
b.Fatal(err)
}
}

b.ResetTimer()
for i := 0; i < b.N; i++ {
w.WorkOne(ctx)
}
}

func nilWorker(context.Context, *Job) error {
return nil
}

func TestWorkerWorkReturnsError(t *testing.T) {
for name, openFunc := range adapterTesting.AllAdaptersOpenTestPool {
t.Run(name, func(t *testing.T) {
Expand Down Expand Up @@ -519,6 +487,52 @@ func testWorkerWorkOneTypeNotInMap(t *testing.T, connPool adapter.ConnPool) {
assert.Contains(t, j.LastError.String, `unknown job type: "MyJob"`)
}

func TestWorkerWorkOneUnknownTypeWM(t *testing.T) {
for name, openFunc := range adapterTesting.AllAdaptersOpenTestPool {
t.Run(name, func(t *testing.T) {
testWorkerWorkOneUnknownTypeWM(t, openFunc(t))
})
}
}

func testWorkerWorkOneUnknownTypeWM(t *testing.T, connPool adapter.ConnPool) {
ctx := context.Background()

c, err := NewClient(connPool)
require.NoError(t, err)

wm := WorkMap{}

var unknownWFCalled int
unknownJobTypeHook := new(mockHook)

w, err := NewWorker(
c,
wm,
WithWorkerHooksUnknownJobType(unknownJobTypeHook.handler),
WithWorkerUnknownJobWorkFunc(func(ctx context.Context, j *Job) error {
unknownWFCalled++
return nil
}),
)
require.NoError(t, err)

didWork := w.WorkOne(ctx)
assert.False(t, didWork)

assert.Equal(t, 0, unknownJobTypeHook.called)

// random job type, because we do not really care in this case
err = c.Enqueue(ctx, &Job{Type: ulid.MustNew(ulid.Now(), ulid.DefaultEntropy()).String()})
require.NoError(t, err)

didWork = w.WorkOne(ctx)
assert.True(t, didWork)

assert.Equal(t, 0, unknownJobTypeHook.called)
assert.Equal(t, 1, unknownWFCalled)
}

// TestWorker_WorkOne_errorHookTx tests that JobDone hooks are running in the same transaction as the errored job
func TestWorker_WorkOneErrorHookTx(t *testing.T) {
for name, openFunc := range adapterTesting.AllAdaptersOpenTestPool {
Expand Down

0 comments on commit 906d4af

Please sign in to comment.