diff --git a/worker.go b/worker.go index 2f4ec9c..edb8188 100644 --- a/worker.go +++ b/worker.go @@ -81,6 +81,8 @@ type Worker struct { tracer trace.Tracer meter metric.Meter + unknownJobTypeWF WorkFunc + hooksJobLocked []HookFunc hooksUnknownJobType []HookFunc hooksJobDone []HookFunc @@ -218,24 +220,8 @@ func (w *Worker) WorkOne(ctx context.Context) (didWork bool) { ll := w.logger.With(adapter.F("job-id", j.ID.String()), adapter.F("job-type", j.Type)) - defer func() { - if err := j.Done(ctx); err != nil { - span.RecordError(fmt.Errorf("failed to mark job as done: %w", err)) - ll.Error("Failed to mark job as done", adapter.Err(err)) - - // let user handle critical job failure - for _, hook := range w.hooksJobUndone { - hook(ctx, j, err) - } - } - - w.mDuration.Record( - ctx, - time.Since(processingStartedAt).Milliseconds(), - metric.WithAttributes(attrJobType.String(j.Type)), - ) - }() - defer w.recoverPanic(ctx, ll, j) + defer w.markJobDone(ctx, j, processingStartedAt, span, ll) + defer w.recoverPanic(ctx, j, ll) for _, hook := range w.hooksJobLocked { hook(ctx, j, nil) @@ -245,8 +231,12 @@ func (w *Worker) WorkOne(ctx context.Context) (didWork bool) { wf, ok := w.wm[j.Type] if !ok { - w.handleUnknownJobType(ctx, j, span, ll) - return + if w.unknownJobTypeWF == nil { + w.handleUnknownJobType(ctx, j, span, ll) + return + } + + wf = w.unknownJobTypeWF } handlerCtx := ctx @@ -323,9 +313,27 @@ func (w *Worker) initMetrics() (err error) { return nil } +func (w *Worker) markJobDone(ctx context.Context, j *Job, processingStartedAt time.Time, span trace.Span, ll adapter.Logger) { + if err := j.Done(ctx); err != nil { + span.RecordError(fmt.Errorf("failed to mark job as done: %w", err)) + ll.Error("Failed to mark job as done", adapter.Err(err)) + + // let user handle critical job failure + for _, hook := range w.hooksJobUndone { + hook(ctx, j, err) + } + } + + w.mDuration.Record( + ctx, + time.Since(processingStartedAt).Milliseconds(), + metric.WithAttributes(attrJobType.String(j.Type)), + ) +} + // recoverPanic tries to handle panics in job execution. // A stacktrace is stored into Job last_error. -func (w *Worker) recoverPanic(ctx context.Context, logger adapter.Logger, j *Job) { +func (w *Worker) recoverPanic(ctx context.Context, j *Job, logger adapter.Logger) { if r := recover(); r != nil { ctx, span := w.tracer.Start(ctx, "Worker.recoverPanic") defer span.End() @@ -376,6 +384,8 @@ type WorkerPool struct { tracer trace.Tracer meter metric.Meter + unknownJobTypeWF WorkFunc + hooksJobLocked []HookFunc hooksUnknownJobType []HookFunc hooksJobDone []HookFunc @@ -431,6 +441,7 @@ func NewWorkerPool(c *Client, wm WorkMap, poolSize int, options ...WorkerPoolOpt WithWorkerPanicStackBufSize(w.panicStackBufSize), WithWorkerSpanWorkOneNoJob(w.spanWorkOneNoJob), WithWorkerJobTTL(w.jobTTL), + WithWorkerUnknownJobWorkFunc(w.unknownJobTypeWF), ) if err != nil { diff --git a/worker_option.go b/worker_option.go index 7bfd706..3f31a4b 100644 --- a/worker_option.go +++ b/worker_option.go @@ -80,6 +80,8 @@ func WithWorkerHooksJobLocked(hooks ...HookFunc) WorkerOption { // WithWorkerHooksUnknownJobType sets hooks that are called when worker finds a job with unknown type. // Error field for this event type is always set since this is an error situation. // If this hook is called - no other lifecycle hooks will be called for the job. +// When the handler for unknown job types is set with WithWorkerUnknownJobWorkFunc - these hooks are never called +// as the job is handled in the regular way using that handler. func WithWorkerHooksUnknownJobType(hooks ...HookFunc) WorkerOption { return func(w *Worker) { w.hooksUnknownJobType = hooks @@ -145,6 +147,15 @@ func WithWorkerJobTTL(d time.Duration) WorkerOption { } } +// WithWorkerUnknownJobWorkFunc sets the handler for unknown job types. +// When the handler is set - hooks set with WithWorkerHooksUnknownJobType are never called as the job is +// handled in the regular way. +func WithWorkerUnknownJobWorkFunc(wf WorkFunc) WorkerOption { + return func(w *Worker) { + w.unknownJobTypeWF = wf + } +} + // WithPoolPollInterval overrides default poll interval with the given value. // Poll interval is the "sleep" duration if there were no jobs found in the DB. func WithPoolPollInterval(d time.Duration) WorkerPoolOption { @@ -203,6 +214,8 @@ func WithPoolHooksJobLocked(hooks ...HookFunc) WorkerPoolOption { } // WithPoolHooksUnknownJobType calls WithWorkerHooksUnknownJobType for every worker in the pool. +// When the handler for unknown job types is set with WithPoolUnknownJobWorkFunc - these hooks are never called +// as the job is handled in the regular way using that handler. func WithPoolHooksUnknownJobType(hooks ...HookFunc) WorkerPoolOption { return func(w *WorkerPool) { w.hooksUnknownJobType = hooks @@ -257,3 +270,12 @@ func WithPoolJobTTL(d time.Duration) WorkerPoolOption { w.jobTTL = d } } + +// WithPoolUnknownJobWorkFunc sets the handler for unknown job types. +// When the handler is set - hooks set with WithPoolHooksUnknownJobType are never called as the job is +// handled in the regular way. +func WithPoolUnknownJobWorkFunc(wf WorkFunc) WorkerPoolOption { + return func(w *WorkerPool) { + w.unknownJobTypeWF = wf + } +} diff --git a/worker_option_test.go b/worker_option_test.go index 9a705f6..acfc8c2 100644 --- a/worker_option_test.go +++ b/worker_option_test.go @@ -129,6 +129,26 @@ func TestWithWorkerPanicStackBufSize(t *testing.T) { assert.Equal(t, 1234, workerWithCustomSize.panicStackBufSize) } +func TestWithWorkerUnknownJobWorkFunc(t *testing.T) { + workerWithoutHandler, err := NewWorker(nil, dummyWM) + require.NoError(t, err) + assert.Nil(t, workerWithoutHandler.unknownJobTypeWF) + + var wfCalled int + wf := WorkFunc(func(ctx context.Context, j *Job) error { + wfCalled++ + return nil + }) + + workerWithHandler, err := NewWorker(nil, dummyWM, WithWorkerUnknownJobWorkFunc(wf)) + require.NoError(t, err) + require.NotNil(t, workerWithHandler.unknownJobTypeWF) + + assert.Equal(t, 0, wfCalled) + _ = workerWithHandler.unknownJobTypeWF(nil, nil) + assert.Equal(t, 1, wfCalled) +} + func TestWithPoolPollInterval(t *testing.T) { workerPoolWithDefaultInterval, err := NewWorkerPool(nil, dummyWM, 2) require.NoError(t, err) @@ -484,3 +504,28 @@ func TestWithPoolJobTTL(t *testing.T) { assert.Equal(t, 10*time.Minute, w.jobTTL) } } + +func TestWithPoolUnknownJobWorkFunc(t *testing.T) { + poolWithoutHandler, err := NewWorkerPool(nil, dummyWM, 2) + require.NoError(t, err) + for _, w := range poolWithoutHandler.workers { + assert.Nil(t, w.unknownJobTypeWF) + } + + var wfCalled int + wf := WorkFunc(func(ctx context.Context, j *Job) error { + wfCalled++ + return nil + }) + + poolWithHandler, err := NewWorkerPool(nil, dummyWM, 2, WithPoolUnknownJobWorkFunc(wf)) + require.NoError(t, err) + require.NotNil(t, poolWithHandler.unknownJobTypeWF) + + assert.Equal(t, 0, wfCalled) + for _, w := range poolWithHandler.workers { + require.NotNil(t, w.unknownJobTypeWF) + _ = w.unknownJobTypeWF(nil, nil) + } + assert.Equal(t, 2, wfCalled) +} diff --git a/worker_test.go b/worker_test.go index 3801c97..f60e3f6 100644 --- a/worker_test.go +++ b/worker_test.go @@ -7,6 +7,7 @@ import ( "testing" "time" + "github.com/oklog/ulid/v2" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.uber.org/zap" @@ -246,39 +247,6 @@ func testWorkerPoolWorkOne(t *testing.T, connPool adapter.ConnPool) { assert.NoError(t, jobDoneHook.err) } -func BenchmarkWorker(b *testing.B) { - for name, openFunc := range adapterTesting.AllAdaptersOpenTestPool { - b.Run(name, func(b *testing.B) { - benchmarkWorker(b, openFunc(b)) - }) - } -} - -func benchmarkWorker(b *testing.B, connPool adapter.ConnPool) { - ctx := context.Background() - - c, err := NewClient(connPool) - require.NoError(b, err) - - w, err := NewWorker(c, WorkMap{"Nil": nilWorker}) - require.NoError(b, err) - - for i := 0; i < b.N; i++ { - if err := c.Enqueue(ctx, &Job{Type: "Nil"}); err != nil { - b.Fatal(err) - } - } - - b.ResetTimer() - for i := 0; i < b.N; i++ { - w.WorkOne(ctx) - } -} - -func nilWorker(context.Context, *Job) error { - return nil -} - func TestWorkerWorkReturnsError(t *testing.T) { for name, openFunc := range adapterTesting.AllAdaptersOpenTestPool { t.Run(name, func(t *testing.T) { @@ -519,6 +487,52 @@ func testWorkerWorkOneTypeNotInMap(t *testing.T, connPool adapter.ConnPool) { assert.Contains(t, j.LastError.String, `unknown job type: "MyJob"`) } +func TestWorkerWorkOneUnknownTypeWM(t *testing.T) { + for name, openFunc := range adapterTesting.AllAdaptersOpenTestPool { + t.Run(name, func(t *testing.T) { + testWorkerWorkOneUnknownTypeWM(t, openFunc(t)) + }) + } +} + +func testWorkerWorkOneUnknownTypeWM(t *testing.T, connPool adapter.ConnPool) { + ctx := context.Background() + + c, err := NewClient(connPool) + require.NoError(t, err) + + wm := WorkMap{} + + var unknownWFCalled int + unknownJobTypeHook := new(mockHook) + + w, err := NewWorker( + c, + wm, + WithWorkerHooksUnknownJobType(unknownJobTypeHook.handler), + WithWorkerUnknownJobWorkFunc(func(ctx context.Context, j *Job) error { + unknownWFCalled++ + return nil + }), + ) + require.NoError(t, err) + + didWork := w.WorkOne(ctx) + assert.False(t, didWork) + + assert.Equal(t, 0, unknownJobTypeHook.called) + + // random job type, because we do not really care in this case + err = c.Enqueue(ctx, &Job{Type: ulid.MustNew(ulid.Now(), ulid.DefaultEntropy()).String()}) + require.NoError(t, err) + + didWork = w.WorkOne(ctx) + assert.True(t, didWork) + + assert.Equal(t, 0, unknownJobTypeHook.called) + assert.Equal(t, 1, unknownWFCalled) +} + // TestWorker_WorkOne_errorHookTx tests that JobDone hooks are running in the same transaction as the errored job func TestWorker_WorkOneErrorHookTx(t *testing.T) { for name, openFunc := range adapterTesting.AllAdaptersOpenTestPool {